简介摘要
SOFARPC是蚂蚁金服开源的高可扩展性、高性能、生产级的轻量级Java RPC服务框架,基于Netty网络通信框架提供应用之间的点对点服务调用功能,为应用之间提供点对点远程服务调用功能。SOFARPC服务发布按照编程界面分为两种使用SOFARPC的方式:
1.通过SOFARPC使用:
服务发布过程涉及到RegistryConfig注册中心配置类,ServerConfig 服务运行容器配置类以及ProviderConfig服务发布配置类。
(1)RegistryConfig注册中心配置类
RegistryConfig registryConfig = new RegistryConfig()
.setProtocol("zookeeper")
.setAddress("127.0.0.1:2181")
RegistryConfig表示注册中心,如上声明服务注册中心的地址和端口是127.0.0.1:2181,协议是Zookeeper。
(2)ServerConfig服务运行容器配置类
ServerConfig serverConfig = new ServerConfig()
.setPort(8803)
.setProtocol("bolt");
ServerConfig表示服务运行容器,如上声明一个使用8803端口和bolt 协议的server。
(3)ProviderConfig服务发布配置类
ProviderConfig<HelloWorldService> providerConfig = new ProviderConfig<HelloWorldService>()
.setInterfaceId(HelloWorldService.class.getName())
.setRef(new HelloWorldServiceImpl())
.setServer(serverConfig)
.setRegistry(registryConfig);
providerConfig.export();
ProviderConfig表示服务发布,如上声明服务的接口,实现和该服务运行的server,最终通过export方法将此服务发布出去。
SOFARPC服务发布支持如下特性:
(1)同一服务发布bolt,rest,dubbo多种协议,构建多个ServerConfig设置给ProviderConfig:
List<ServerConfig> serverConfigs = new ArrayList<ServerConfig>();
serverConfigs.add(serverConfigA);
serverConfigs.add(serverConfigB);
providerConfig.setServer(serverConfigs);
(2)同一服务注册多个注册中心,构建多个RegistryConfig设置给 ProviderConfig:
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
registryConfigs.add(registryA);
registryConfigs.add(registryB);
providerConfig.setRegistry(registryConfigs);
(3)提供MethodConfig进行方法级别参数设置,API方式使用相应的对象set法即可为其设置参数:
MethodConfig methodConfigA = new MethodConfig();
MethodConfig methodConfigB = new MethodConfig();
List<MethodConfig> methodConfigs = new ArrayList<MethodConfig>();
methodConfigs.add(methodConfigA);
methodConfigs.add(methodConfigB);
providerConfig.setMethods(methodConfigs); //服务端设置
2.通过SOFABoot使用:
服务发布使用XML配置通过sofa:service元素表示发布服务,XML配置如下所示就能够发布SOFARPC服务:
<bean id="helloSyncServiceImpl" class="com.alipay.sofa.rpc.samples.invoke.HelloSyncServiceImpl"/>
<sofa:service ref="helloSyncServiceImpl" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
<sofa:binding.bolt/>
</sofa:service>
如上声明服务实现helloSyncServiceImpl,通过sofa:service元素将该服务发布,其中ref属性表示发布的服务实例,interface属性表示该服务的接口。而sofa:binding.bolt元素表示该服务会提供 bolt 协议调用通道。
当SOFARPC应用启动的时候发现当前应用需要发布RPC服务的话,SOFARPC将这些服务注册到服务注册中心上面。服务发布将该服务实例注册到对应协议的server上,并且发布该服务的元数据信息到注册中心。
一个服务也可以通过多种协议进行发布,并且通过sofa:method元素表示方法级别配置,XML配置如下所示:
<sofa:service ref="helloSyncServiceImpl" interface="com.alipay.sofa.rpc.samples.invoke.HelloSyncService">
<sofa:binding.bolt/>
<sofa:binding.rest/>
<sofa:binding.dubbo/>
</sofa:service>
<sofa:service ref="sampleFacadeImpl" interface="com.alipay.sofa.rpc.bean.SampleFacade">
<sofa:binding.bolt>
<sofa:global-attrs timeout="3000"/>
<sofa:method name="sayName" timeout="2000"/>
</sofa:binding.bolt>
</sofa:service>
源码解析
搭建环境服务发布示例:
package org.alipay.sofa.rpc;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
public class RpcServer {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12800) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端
providerConfig.export(); // 发布服务
}
}
参考SOFARPC Example示例模块(com.alipay.sofa.rpc.quickstart.QuickStartServer
):
package com.alipay.sofa.rpc.quickstart;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
/**
* Quick Start Server
*/
public class QuickStartServer {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12000) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端
providerConfig.export(); // 发布服务
}
}
运行服务发布端示例类QuickStartServer查看提供端运行效果,服务提供者输出日志如下:
Connected to the target VM, address: '127.0.0.1:59510', transport: 'socket'
2018-05-14 15:34:11,927 main INFO [com.alipay.sofa.rpc.context.RpcRuntimeContext:info:102] - Welcome! Loading SOFA RPC Framework : 5.4.0_20180427231325, PID is:1952
2018-05-14 15:34:14,752 main INFO [com.alipay.sofa.rpc.module.ModuleFactory:info:102] - Install Module: fault-tolerance
2018-05-14 15:34:54,347 main INFO [com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap:infoWithApp:122] - Export provider config : com.alipay.sofa.rpc.quickstart.HelloService: with bean id rpc-cfg-0
Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ]
2018-05-14 15:35:59,083 main INFO [com.alipay.sofa.common.log:report:30] - Sofa-Middleware-Log SLF4J : Actual binding is of type [ com.alipay.remoting Log4j ]
SOFARPC服务发布流程:
(1)创建服务运行容器配置类ServerConfig,设置ServerConfig实例监听端口、通信协议以及是否为守护线程(是否hold住端口,true的话随主线程退出而退出,false的话则要主动退出)等基础配置信息,当然ServerConfig除自定义设置基础配置以外,类加载自动加载配置文件获取服务运行容器默认配置:
/**
* 服务端配置
*
*/
public class ServerConfig extends AbstractIdConfig implements Serializable
服务运行容器配置类ServerConfig继承默认配置带ID配置类 AbstractIdConfig,服务端配置ServerConfig空构造器没有自动加载默认配置,如何实现自动通过配置文件加载服务端默认配置信息?默认配置带ID类AbstractIdConfig静态代码块调用全局的运行时上下文类RpcRuntimeContext的now()方法,通过运行时上下文类RpcRuntimeContext静态代码模块在类加载的时候读取配置文件设置服务端类ServerConfig默认配置属性:
static {
RpcRuntimeContext.now();
}
static {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);
}
put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
// 初始化一些上下文
initContext();
// 初始化其它模块
ModuleFactory.installModules();
// 增加jvm关闭事件
if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");
}
destroy(false);
}
}, "SOFA-RPC-ShutdownHook"));
}
}
全局运行时上下文类RpcRuntimeContext首先初始化上下文自动部署的appId、appName、appInsId以及当前所在文件夹地址appPath信息:
/**
* 初始化一些上下文
*/
private static void initContext() {
putIfAbsent(KEY_APPID, RpcConfigs.getOrDefaultValue(APP_ID, null));
putIfAbsent(KEY_APPNAME, RpcConfigs.getOrDefaultValue(APP_NAME, null));
putIfAbsent(KEY_APPINSID, RpcConfigs.getOrDefaultValue(INSTANCE_ID, null));
putIfAbsent(KEY_APPAPTH, System.getProperty("user.dir"));
}
其中通过配置加载器和操作入口RpcConfigs静态代码块在类加载的时候加载rpc-config-default.json、sofa-rpc/rpc-config.json、META-INF/sofa-rpc/rpc-config.json配置文件获取默认配置、自定义配置信息自定义配置信息存储在全部配置CFG:
static {
init(); // 加载配置文件
}
private static void init() {
try {
// loadDefault
String json = FileUtils.file2String(RpcConfigs.class, "rpc-config-default.json", "UTF-8");
Map map = JSON.parseObject(json, Map.class);
CFG.putAll(map);
// loadCustom
loadCustom("sofa-rpc/rpc-config.json");
loadCustom("META-INF/sofa-rpc/rpc-config.json");
// load system properties
CFG.putAll(new HashMap(System.getProperties())); // 注意部分属性可能被覆盖为字符串
} catch (Exception e) {
throw new SofaRpcRuntimeException("Catch Exception when load RpcConfigs", e);
}
}
接着扩展加载器ExtensionLoader根据需要被加载的模块列表遍历判断是否需要加载来安装模板初始化其他模块:
/**
* 加载全部模块
*/
public static void installModules() {
ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
String moduleName = o.getKey();
Module module = o.getValue().getExtInstance();
// judge need load from rpc option
if (needLoad(moduleLoadList, moduleName)) {
// judge need load from implement
if (module.needLoad()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Install Module: {}", moduleName);
}
module.install();
INSTALLED_MODULES.put(moduleName, module);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " does not need to be loaded.");
}
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " is not in the module load list.");
}
}
}
}
最后根据是否主动监听JVM关闭事件(默认为true)增加JVM关闭事件监听执行反注册服务端,关闭启动端口、发布服务、调用服务、注册中心、客户端公共资源,卸载模块、钩子以及清理缓存销毁操作:
private static void destroy(boolean active) {
RpcRunningState.setShuttingDown(true);
for (Destroyable.DestroyHook destroyHook : DESTROY_HOOKS) {
destroyHook.preDestroy();
}
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderBootstrap bootstrap : EXPORTED_PROVIDER_CONFIGS) {
providerConfigs.add(bootstrap.getProviderConfig());
}
// 先反注册服务端
List<Registry> registries = RegistryFactory.getRegistries();
if (CommonUtils.isNotEmpty(registries) && CommonUtils.isNotEmpty(providerConfigs)) {
for (Registry registry : registries) {
registry.batchUnRegister(providerConfigs);
}
}
// 关闭启动的端口
ServerFactory.destroyAll();
// 关闭发布的服务
for (ProviderBootstrap bootstrap : EXPORTED_PROVIDER_CONFIGS) {
bootstrap.unExport();
}
// 关闭调用的服务
for (ConsumerBootstrap bootstrap : REFERRED_CONSUMER_CONFIGS) {
ConsumerConfig config = bootstrap.getConsumerConfig();
if (!CommonUtils.isFalse(config.getParameter(RpcConstants.HIDDEN_KEY_DESTROY))) { // 除非不让主动unrefer
bootstrap.unRefer();
}
}
// 关闭注册中心
RegistryFactory.destroyAll();
// 关闭客户端的一些公共资源
ClientTransportFactory.closeAll();
// 卸载模块
if (!RpcRunningState.isUnitTestMode()) {
ModuleFactory.uninstallModules();
}
// 卸载钩子
for (Destroyable.DestroyHook destroyHook : DESTROY_HOOKS) {
destroyHook.postDestroy();
}
// 清理缓存
RpcCacheManager.clearAll();
RpcRunningState.setShuttingDown(false);
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("SOFA RPC Framework has been release all resources {}...",
active ? "actively " : "");
}
}
(2)创建服务发布配置类ProviderConfig,设置ProviderConfig实例接口名称(服务接口:做为服务唯一标识的组成部分)、接口实现类引用以及配置的协议列表即指定服务运行容器配置:
/**
* 服务提供者配置
*
* @param <T> the type parameter
*/
public class ProviderConfig<T> extends AbstractInterfaceConfig<T, ProviderConfig<T>> implements Serializable
服务发布配置类ProviderConfig继承接口级公共配置类AbstractInterfaceConfig,能够通过集成的注册中心更新服务发布接口级别配置例如超时时间、权重等。
(3)服务发布配置类ProviderConfig负责加载更新服务发布接口级配置,根据面向对象单一职责原则,需要绑定服务提供者启动类ProviderBootstrap进行发布服务:
/**
* 发布服务
*/
public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}
首先判断服务提供者启动类ProviderBootstrap是否为空,通过发布服务辅助工具类Bootstraps根据绑定服务发布配置扩展加载工厂ExtensionLoaderFactory加载初始化ProviderBootstrap实例:
/**
* 发布一个服务
*
* @param providerConfig 服务发布者配置
* @param <T> 接口类型
* @return 发布启动类
*/
public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
String bootstrap = providerConfig.getBootstrap();
if (StringUtils.isEmpty(bootstrap)) {
// Use default provider bootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
providerConfig.setBootstrap(bootstrap);
}
ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
return (ProviderBootstrap<T>) providerBootstrap;
}
接着发布服务包装类ProviderBootstrap通过export()方法发布服务,ProviderBootstrap基于Bolt、Rest、Dubbo网络通信协议提供三种协议服务发布实现类:BoltProviderBootstrap、RestProviderBootstrap以及DubboProviderBootstrap。默认服务提供者启动器DefaultProviderBootstrap发布服务export()方法首先判断服务发布延迟时间配置(默认0,配置为-1代表Spring加载完毕)是否大于0,启动线程睡眠指定延迟时间延迟加载,接着调用doExport()方法执行SOFARPC发布服务逻辑:
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}
private void doExport() {
if (exported) {
return;
}
String key = providerConfig.buildKey();
String appName = providerConfig.getAppName();
// 检查参数
checkParameters();
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}
// 注意同一interface,同一uniqleId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
cnt.decrementAndGet();
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}
try {
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册序列化接口
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
}
// 注册到注册中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
cnt.decrementAndGet();
if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
throw new SofaRpcRuntimeException("Build provider proxy error!", e);
}
}
// 记录一些缓存数据
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
默认服务提供者启动器DefaultProviderBootstrap发布服务doExport()方法执行逻辑:
(1)根据服务发布配置ProviderConfig获取服务接口、服务标签构建key以及获取应用名称appName;
(2)调用checkParameters()方法检查参数,包括检查注入的ref是否接口实现类、检查服务运行容器配置是否为空、检查方法及其黑白名单:
/**
* for check fields and parameters of consumer config
*/
protected void checkParameters() {
// 检查注入的ref是否接口实现类
Class proxyClass = providerConfig.getProxyClass();
String key = providerConfig.buildKey();
T ref = providerConfig.getRef();
if (!proxyClass.isInstance(ref)) {
throw ExceptionUtils.buildRuntime("provider.ref",
ref == null ? "null" : ref.getClass().getName(),
"This is not an instance of " + providerConfig.getInterfaceId()
+ " in provider config with key " + key + " !");
}
// server 不能为空
if (CommonUtils.isEmpty(providerConfig.getServer())) {
throw ExceptionUtils.buildRuntime("server", "NULL", "Value of \"server\" is not specified in provider" +
" config with key " + key + " !");
}
checkMethods(proxyClass);
}
/**
* 检查方法,例如方法名、多态(重载)方法
*
* @param itfClass 接口类
*/
protected void checkMethods(Class<?> itfClass) {
ConcurrentHashMap<String, Boolean> methodsLimit = new ConcurrentHashMap<String, Boolean>();
for (Method method : itfClass.getMethods()) {
String methodName = method.getName();
if (methodsLimit.containsKey(methodName)) {
// 重名的方法
if (LOGGER.isWarnEnabled(providerConfig.getAppName())) {
LOGGER.warnWithApp(providerConfig.getAppName(), "Method with same name \"" + itfClass.getName()
+ "." + methodName + "\" exists ! The usage of overloading method in rpc is deprecated.");
}
}
// 判断服务下方法的黑白名单
Boolean include = methodsLimit.get(methodName);
if (include == null) {
include = inList(providerConfig.getInclude(), providerConfig.getExclude(), methodName); // 检查是否在黑白名单中
methodsLimit.putIfAbsent(methodName, include);
}
providerConfig.setMethodsLimit(methodsLimit);
}
}
(3)检验同一个服务(服务接口&服务标签相同)的发布次数是否超过服务引用配置的最大次数,超过最大数量直接抛出异常;
(4)根据服务发布配置构造请求调用器ProviderProxyInvoker,构造服务端调用链最底层是调用过滤器执行链filterChain:
/**
* 构造执行链
*
* @param providerConfig 服务端配置
*/
public ProviderProxyInvoker(ProviderConfig providerConfig) {
this.providerConfig = providerConfig;
// 最底层是调用过滤器
this.filterChain = FilterChain.buildProviderChain(providerConfig,
new ProviderInvoker(providerConfig));
}
服务端调用链入口ProviderProxyInvoker根据服务发布配置通过服务端调用业务实现类ProviderInvoker初始化过滤器包装Invoker对象(隔离Filter和Service的关系,filter是单例)指定无需下一层过滤器,按照自动装载扩展实现逻辑,获取用户new实例方式注入的过滤器(优先级高),判断是否需要排除系统过滤器,解析用户通过别名方式注入的过滤器准备数据,解析自动加载的过滤器加入自定义的过滤器构造执行链:
/**
* 构造服务端的执行链
*
* @param providerConfig provider配置
* @param lastFilter 最后一个filter
* @return filter执行链
*/
public static FilterChain buildProviderChain(ProviderConfig<?> providerConfig, FilterInvoker lastFilter) {
/*
* 例如自动装载扩展 A(a),B(b),C(c) filter=[-a,d] filterRef=[new E, new Exclude(b)]
* 逻辑如下:
* 1.解析config.getFilterRef(),记录E和-b
* 2.解析config.getFilter()字符串,记录 d 和 -a,-b
* 3.再解析自动装载扩展,a,b被排除了,所以拿到c,d
* 4.对c d进行排序
* 5.拿到C、D实现类
* 6.加上自定义,返回C、D、E
*/
// 用户通过自己new实例的方式注入的filter,优先级高
List<Filter> customFilters = providerConfig.getFilterRef() == null ?
new ArrayList<Filter>() : new CopyOnWriteArrayList<Filter>(providerConfig.getFilterRef());
// 先解析是否有特殊处理
HashSet<String> excludes = parseExcludeFilter(customFilters);
// 准备数据:用户通过别名的方式注入的filter,需要解析
List<ExtensionClass<Filter>> extensionFilters = new ArrayList<ExtensionClass<Filter>>();
List<String> filterAliases = providerConfig.getFilter(); //
if (CommonUtils.isNotEmpty(filterAliases)) {
for (String filterAlias : filterAliases) {
if (startsWithExcludePrefix(filterAlias)) { // 排除用的特殊字符
excludes.add(filterAlias.substring(1));
} else {
ExtensionClass<Filter> filter = EXTENSION_LOADER.getExtensionClass(filterAlias);
if (filter != null) {
extensionFilters.add(filter);
}
}
}
}
// 解析自动加载的过滤器
if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) { // 配了-*和-default表示不加载内置
for (Map.Entry<String, ExtensionClass<Filter>> entry : PROVIDER_AUTO_ACTIVES.entrySet()) {
if (!excludes.contains(entry.getKey())) {
extensionFilters.add(entry.getValue());
}
}
}
excludes = null; // 不需要了
// 按order从小到大排序
if (extensionFilters.size() > 1) {
Collections.sort(extensionFilters, new OrderedComparator<ExtensionClass<Filter>>());
}
List<Filter> actualFilters = new ArrayList<Filter>();
for (ExtensionClass<Filter> extensionFilter : extensionFilters) {
actualFilters.add(extensionFilter.getExtInstance());
}
// 加入自定义的过滤器
actualFilters.addAll(customFilters);
return new FilterChain(actualFilters, lastFilter, providerConfig);
}
/**
* 构造执行链
*
* @param filters 包装过滤器列表
* @param lastInvoker 最终过滤器
* @param config 接口配置
*/
protected FilterChain(List<Filter> filters, FilterInvoker lastInvoker, AbstractInterfaceConfig config) {
// 调用过程外面包装多层自定义filter
// 前面的过滤器在最外层
invokerChain = lastInvoker;
if (CommonUtils.isNotEmpty(filters)) {
loadedFilters = new ArrayList<Filter>();
for (int i = filters.size() - 1; i >= 0; i--) {
try {
Filter filter = filters.get(i);
if (filter.needToLoad(invokerChain)) {
invokerChain = new FilterInvoker(filter, invokerChain, config);
// cache this for filter when async respond
loadedFilters.add(filter);
}
} catch (Exception e) {
LOGGER.error("Error when build filter chain", e);
throw new SofaRpcRuntimeException("Error when build filter chain", e);
}
}
}
}
(5)判断服务配置是否注册提前初始化注册中心,注册中心配置类RegistryConfig根据注册中心配置获取注册中心实现类Registry实例,注册中心实现类Registry包括基于本地文件注册中心LocalRegistry和基于Zookeeper分布式协调系统注册中心ZookeeperRegistry:
/**
* 得到注册中心对象
*
* @param registryConfig RegistryConfig类
* @return Registry实现
*/
public static synchronized Registry getRegistry(RegistryConfig registryConfig) {
if (ALL_REGISTRIES.size() > 3) { // 超过3次 是不是配错了?
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Size of registry is greater than 3, Please check it!");
}
}
try {
// 注意:RegistryConfig重写了equals方法,如果多个RegistryConfig属性一样,则认为是一个对象
Registry registry = ALL_REGISTRIES.get(registryConfig);
if (registry == null) {
ExtensionClass<Registry> ext = ExtensionLoaderFactory.getExtensionLoader(Registry.class)
.getExtensionClass(registryConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("registry.protocol", registryConfig.getProtocol(),
"Unsupported protocol of registry config !");
}
registry = ext.getExtInstance(new Class[] { RegistryConfig.class }, new Object[] { registryConfig });
ALL_REGISTRIES.put(registryConfig, registry);
}
return registry;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
(6)根据服务发布配置ProviderConfig获取服务运行容器配置列表,遍历服务运行容器配置构建启动服务运行容器Server实例,服务运行容器Server包括基于Bolt协议的BoltServer和基于Rest协议的RestServer:
/**
* 启动服务
*
* @return the server
*/
public synchronized Server buildIfAbsent() {
if (server != null) {
return server;
}
// 提前检查协议+序列化方式
// ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
// SerializationType.valueOf(getSerialization()));
server = ServerFactory.getServer(this);
return server;
}
通过服务运行容器工厂ServerFactory的getServer()方法获取启动基于当前服务运行容器配置ServerConfig的服务端Server实例,首先按照服务运行容器配置确定服务端Server的Host和Port,扩展加载器工厂ExtensionLoaderFactory生成服务运行容器Server实例,初始化启动服务端Server,缓存服务端Server到SERVER_MAP:
/**
* 初始化Server实例
*
* @param serverConfig 服务端配置
* @return Server
*/
public synchronized static Server getServer(ServerConfig serverConfig) {
try {
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
if (server == null) {
// 算下网卡和端口
resolveServerConfig(serverConfig);
ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol());
if (ext == null) {
throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
"Unsupported protocol of server!");
}
server = ext.getExtInstance();
server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
}
return server;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
默认服务运行容器BoltServer启动Server端设置服务运行容器配置引用,按照服务端业务线程池核心线程数、最大线程数、线程队列以及回收时间配置初始化业务线程池提供给RpcHandler 处理复杂业务使用,不会影响Netty IO线程运行。根据服务运行容器BoltServer创建BoltServerProcessor处理器并且支持自定义业务线程池,BoltServerProcessor处理器继承AbstractUserProcessor类实现UserProcessor接口,当调用RpcHandler处理器的channelRead(ChannelHandlerContext ctx, Object msg)方法时候,根据请求类名称获取对应的UserProcessor执行handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request)方法查找服务调用器、请求方法实施调用方法处理请求:
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
// 启动线程池
bizThreadPool = initThreadPool(serverConfig);
boltServerProcessor = new BoltServerProcessor(this);
}
protected ThreadPoolExecutor initThreadPool(ServerConfig serverConfig) {
ThreadPoolExecutor threadPool = BusinessPool.initPool(serverConfig);
threadPool.setThreadFactory(new NamedThreadFactory(
"BOLT-BIZ-" + serverConfig.getPort(), serverConfig.isDaemon()));
threadPool.setRejectedExecutionHandler(new SofaRejectedExecutionHandler());
if (serverConfig.isPreStartCore()) { // 初始化核心线程池
threadPool.prestartAllCoreThreads();
}
return threadPool;
}
/**
* Construct
*
* @param boltServer 所在的Server
*/
public BoltServerProcessor(BoltServer boltServer) {
this.boltServer = boltServer;
this.executorSelector = new UserThreadPoolSelector(); // 支持自定义业务线程池
}
/**
* Bolt server processor of bolt server.
*
*/
public class BoltServerProcessor extends AsyncUserProcessor<SofaRequest>{...}
public abstract class AsyncUserProcessor<T> extends AbstractUserProcessor<T>{...}
public abstract class AbstractUserProcessor<T> implements UserProcessor<T>{...}
public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService defaultExecutor) throws Exception {
if (this.deserializeRequestCommand(ctx, cmd, 0)) {
UserProcessor userProcessor = ctx.getUserProcessor(cmd.getRequestClass());
if (userProcessor == null) {
String errMsg = "No user processor found for request: " + cmd.getRequestClass();
logger.error(errMsg);
this.sendResponseIfNecessary(ctx, cmd.getType(), this.getCommandFactory().createExceptionResponse(cmd.getId(), errMsg));
} else {
ctx.setTimeoutDiscard(userProcessor.timeoutDiscard());
if (userProcessor.processInIOThread()) {
if (this.deserializeRequestCommand(ctx, cmd, 2)) {
(new RpcRequestProcessor.ProcessTask(ctx, cmd)).run();
}
} else {
Executor executor = null;
if (null == userProcessor.getExecutorSelector()) {
executor = userProcessor.getExecutor();
} else {
if (!this.deserializeRequestCommand(ctx, cmd, 1)) {
return;
}
executor = userProcessor.getExecutorSelector().select(cmd.getRequestClass(), cmd.getRequestHeader());
}
if (executor == null) {
executor = this.getExecutor() == null ? defaultExecutor : this.getExecutor();
}
((Executor)executor).execute(new RpcRequestProcessor.ProcessTask(ctx, cmd));
}
}
}
}
什么时候调用BoltServerProcessor处理器handleRequest方法执行方法调用处理请求?查看BoltServerProcessor处理器handleRequest方法调用链:
BoltServerProcessor处理器handleRequest方法调用链
BoltServerProcessor处理器handleRequest方法调用链顶端是通过RpcRequestProcessor处理器ProcessTask处理任务调用,ProcessTask任务提交给线程池处理,查看ProcessTask构造方法调用链:
ProcessTask构造方法调用链
ProcessTask构造方法调用链顶端是通过RpcHandler处理器的channelRead方法将消息分发给相应的协议,什么时候使用默认服务端BoltServer业务线程池bizThreadPool处理复杂业务?查看BoltServer服务端getBizThreadPool方法调用链:BoltServer服务端getBizThreadPool方法调用链 BoltServer服务端getBizThreadPool方法最终也是通过RpcHandler处理器的channelRead方法调用,即获取服务端BoltServer业务线程池bizThreadPool,提交RpcRequestProcessor处理器ProcessTask任务,执行BoltServerProcessor处理器handleRequest方法处理请求。
(7)服务端Server实例注册服务提供者配置、服务端调用实现,默认服务运行容器BoltServer缓存服务端调用Invoker对象到接口--> Invoker映射invokerMap以及服务发布接口方法:
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
// 缓存Invoker对象
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
invokerMap.put(key, instance);
// 缓存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
}
(8)判断服务端配置ServerConfig是否自动启动,启动服务运行容器Server实例:
public void start() {
if (started) {
return;
}
synchronized (this) {
if (started) {
return;
}
// 生成Server对象
remotingServer = initRemotingServer();
try {
if (!remotingServer.start(serverConfig.getBoundHost())) {
throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
started = true;
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}
默认服务端BoltServer启动服务首先以服务端配置端口通过RpcServer初始化生成远程服务端Server实例,远程服务端Server将BoltServerProcessor处理器缓存到userProcessors:
protected RemotingServer initRemotingServer() {
// 绑定到端口
RemotingServer remotingServer = new RpcServer(serverConfig.getPort());
remotingServer.registerUserProcessor(boltServerProcessor);
return remotingServer;
}
public RpcServer(int port) {
super(port);
this.globalSwitch = new GlobalSwitch();
this.connectionEventListener = new ConnectionEventListener();
this.userProcessors = new ConcurrentHashMap(4);
this.bossGroup = new NioEventLoopGroup(1, new NamedThreadFactory("Rpc-netty-server-boss"));
}
public void registerUserProcessor(UserProcessor<?> processor) {
if (processor != null && !StringUtils.isBlank(processor.interest())) {
UserProcessor<?> preProcessor = (UserProcessor)this.userProcessors.putIfAbsent(processor.interest(), processor);
if (preProcessor != null) {
String errMsg = "Processor with interest key [" + processor.interest() + "] has already been registered to rpc server, can not register again!";
throw new RuntimeException(errMsg);
}
} else {
throw new RuntimeException("User processor or processor interest should not be blank!");
}
}
接着远程服务端Server按照服务运行容器绑定Host配置启动服务,执行init()方法初始化RpcRemoting远程连接事件处理器以及连接管理器,初始化服务启动类ServerBootstrap实例,设置服务启动类ServerBootstrap属性,根据缓存BoltServerProcessor的userProcessors创建RpcHandler处理器并且添加到ChannelPipeline,创建Rpc服务连接,执行doStart()方法服务启动类ServerBootstrap实例绑定指定IP地址同步:
public boolean start(String ip) {
this.init();
if (this.started.compareAndSet(false, true)) {
try {
logger.warn("Server started on " + ip + ":" + this.port);
return this.doStart(ip);
} catch (Throwable var3) {
this.started.set(false);
logger.error("ERROR: Failed to start the Server!", var3);
return false;
}
} else {
logger.error("ERROR: The server has already started!");
return false;
}
}
protected void doInit() {
if (this.addressParser == null) {
this.addressParser = new RpcAddressParser();
}
this.initRpcRemoting((RpcRemoting)null);
this.bootstrap = new ServerBootstrap();
((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)this.bootstrap.group(this.bossGroup, workerGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, SystemProperties.tcp_so_backlog())).option(ChannelOption.SO_REUSEADDR, SystemProperties.tcp_so_reuseaddr())).childOption(ChannelOption.TCP_NODELAY, SystemProperties.tcp_nodelay()).childOption(ChannelOption.SO_KEEPALIVE, SystemProperties.tcp_so_keepalive());
this.initWriteBufferWaterMark();
boolean pooledBuffer = SystemProperties.netty_buffer_pooled();
if (pooledBuffer) {
((ServerBootstrap)this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
final boolean idleSwitch = SystemProperties.tcp_idle_switch();
final int idleTime = SystemProperties.tcp_server_idle();
final ChannelHandler serverIdleHandler = new ServerIdleHandler();
final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new RpcProtocolDecoder(1));
pipeline.addLast("encoder", new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(new byte[]{2})));
if (idleSwitch) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(0L, 0L, (long)idleTime, TimeUnit.MILLISECONDS));
pipeline.addLast("serverIdleHandler", serverIdleHandler);
}
pipeline.addLast("connectionEventHandler", RpcServer.this.connectionEventHandler);
pipeline.addLast("handler", rpcHandler);
this.createConnection(channel);
}
private void createConnection(SocketChannel channel) {
Url url = RpcServer.this.addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
if (RpcServer.this.globalSwitch.isOn(2)) {
RpcServer.this.connectionManager.add(new Connection(channel, url), url.getUniqueKey());
} else {
new Connection(channel, url);
}
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
}
});
}
protected boolean doStart() throws InterruptedException {
this.channelFuture = this.bootstrap.bind(new InetSocketAddress(this.port)).sync();
return this.channelFuture.isSuccess();
}
(9)服务发布配置ProviderConfig实例绑定Provider配置发生变化监听器ProviderAttributeListener,监听Provider属性变化更新服务发布配置属性:
/**
* Provider配置发生变化监听器
*/
private class ProviderAttributeListener implements ConfigListener {
@Override
public void configChanged(Map newValue) {
}
@Override
public synchronized void attrUpdated(Map newValueMap) {
String appName = providerConfig.getAppName();
// 可以改变的配置 例如tag concurrents等
Map<String, String> newValues = (Map<String, String>) newValueMap;
Map<String, String> oldValues = new HashMap<String, String>();
boolean reexport = false;
try { // 检查是否有变化
// 是否过滤map?
for (Map.Entry<String, String> entry : newValues.entrySet()) {
String newValue = entry.getValue();
String oldValue = providerConfig.queryAttribute(entry.getKey());
boolean changed = oldValue == null ? newValue != null : !oldValue.equals(newValue);
if (changed) {
oldValues.put(entry.getKey(), oldValue);
}
reexport = reexport || changed;
}
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when provider attribute compare", e);
return;
}
// 需要重新发布
if (reexport) {
try {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Reexport service {}", providerConfig.buildKey());
}
unExport();
// change attrs
for (Map.Entry<String, String> entry : newValues.entrySet()) {
providerConfig.updateAttribute(entry.getKey(), entry.getValue(), true);
}
export();
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when provider attribute changed", e);
//rollback old attrs
for (Map.Entry<String, String> entry : oldValues.entrySet()) {
providerConfig.updateAttribute(entry.getKey(), entry.getValue(), true);
}
export();
}
}
}
}
(10)注册订阅服务列表,判断服务发布配置是否注册,根据服务发布注册中心配置列表遍历注册中心配置通过注册中心工厂获取注册中心实例并且初始化启动注册中心,注册服务发布配置到注册中心:
/**
* 订阅服务列表
*/
protected void register() {
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (registryConfigs != null) {
for (RegistryConfig registryConfig : registryConfigs) {
Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();
try {
registry.register(providerConfig);
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = providerConfig.getAppName();
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.warnWithApp(appName, "Catch exception when register to registry: "
+ registryConfig.getId(), e);
}
}
}
}
}
}
(11)全局运行时上下文RpcRuntimeContext增加缓存ProviderConfig,缓存服务发布配置到EXPORTED_PROVIDER_CONFIGS集合。
解析总结
SOFARPC服务发布流程概括为SOFARPC服务需要创建服务运行容器配置ServerConfig,自定义设置基础配置并且通过配置文件加载服务端默认配置;创建服务发布配置ProviderConfig,自定义设置接口名称、接口实现类引用以及指定服务端配置;服务发布启动类ProviderBootstrap发布服务:构造请求调用器ProviderProxyInvoker(最底层调用过滤器执行链FilterChain),提前初始化注册中心,创建服务端Server包括启动业务线程池bizThreadPool 、创建BoltServerProcessor处理器,服务端Server注册服务提供者配置ProviderConfig、服务端调用实现ProviderProxyInvoker,服务端Server启动服务创建RpcServer,将BoltServerProcessor处理器缓存到userProcessors,初始化服务启动ServerBootstrap,根据userProcessors创建RpcHandler处理器添加到ChannelPipeline,创建RPC服务连接完成启动服务链路。RPC服务请求调用RpcHandler的channelRead()方法获取UserProcessor执行handlerRequest()方法查找服务调用器Invoker实施调用过滤器执行链FilterChain方法调用。
网友评论