在dubbo中服务的暴露的逻辑起源于ServiceBean这个类,而这个类的注册激活在spring中可以通过标签也可以通过xml的配置方式进行。下面讲解这两种方式不同之处。
1.xml的方式
在这里讲解GitHub上面最新的版本的Dubbo4.3.16版本
对spring研究过的肯定知道,spring可以通过自定xsd文件来扩展自定义标签的。对应的标签解析类定义在spring.handlers文件中。关于自定义XSD扩展在前面的Spring源码解析中讲解过,也进行过举例,自定义标签的使用
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
Dubbo也对Spring进行兼容,所以我们能通过配置文件找到了DubboNamespaceHandler类
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
//校验是否有多个不同版本的dubbo包
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
//在这里在注册不同的配置元素对应的解析器。
@Override
public void init() {
....
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
....
}
}
2.标签的方式
在前面已经讲解过@EnableDubbo这个标签已经对应的@Service和@Reference标签的解析,而在@service标签的解析的过程中,有提到过注册service时候注册的Bean的类型是ServiceBean,不了解的可以看看。Service标签的解析。这里贴一下代码
public class ServiceAnnotationBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
ResourceLoaderAware, BeanClassLoaderAware {
...
private AbstractBeanDefinition buildServiceBeanDefinition(Service service, Class<?> interfaceClass,
String annotatedServiceBeanName) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
...
}
...
}
可以看到这个时候就对ServiceBean进行了注入,并且在创建的时候设置了其他重要的信息。
1.什么时候进行的服务导出
既然是服务的导出,这里就进入ServiceBean这个类进行分析。ServiceBean实现了ApplicationListener类。ApplicationListener是Spring的一个上下文监听器的接口类,作用是当完成Spring完成某项流程的时候就会发布一个广播事件,如果对这个事件进行了监听就会触发对应的逻辑。也就是监听器模式。
public void onApplicationEvent(ContextRefreshedEvent event) {
//如果还没有导出服务并且服务没有被取消导出
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//进行服务导出操作
export();
}
}
在上面我们看到了onApplicationEvent方法传入的对象是ContextRefreshedEvent。这个对象是当Spring的上下文被刷新或者加载完毕的时候触发的。因此服务就是在Spring的上下文刷新后进行导出操作的。其中是否延迟导出和服务是否被取消导出都可以进行设置。
2.进行服务导出
ServiceBean继承于ServiceConfig类(这个类是我们进行service配置的属性的所在的类)。并对export方法进行了重载
@Override
public void export() {
//调用父类的export方法
super.export();
// 服务导出完毕之后发布事件 ServiceBeanExportedEvent
publishExportEvent();
}
ServiceConfig类
public synchronized void export() {
//1.服务发布之前的参数检查和更新
checkAndUpdateSubConfigs();
//2.检查是否进行服务的导出
if (!shouldExport()) {
return;
}
//3.是否需要对服务进行延迟导出,就是使用定时任务对4步骤进行调用
if (shouldDelay()) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
//4.如果不需要进行延迟发布则现在就进行导出
doExport();
}
}
现在对上面可以看到整个导出流程进行解析
1.1服务发布之前的参数检查和更新
public void checkAndUpdateSubConfigs() {
// 使用在全局配置中明确定义的默认配置,主要是对信息的补全可以理解为,
// 在我们使用的时候很多配置都是没有配置,他会使用默认的配置对我们没有配置的位置进行补全
completeCompoundConfigs();
// 先启动配置中心,这个配置中心是ConfigManager类,其中保存了所有的服务导出和服务引用的配置
startConfigCenter();
//检查服务提供配置如果还没有创建就进行创建,并刷新
checkDefault();
//检查AppLication配置是否还没有创建,如果还没有创建就进行创建,,并刷新
checkApplication();
//同上面一样这里是检查注册列表配置
checkRegistry();
//检查协议配置
checkProtocol();
//进行刷新
this.refresh();
//元数据报告配置
checkMetadataReport();
//简介服务的接口名是不是空
/*
<bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/>
*/
if (StringUtils.isEmpty(interfaceName)) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
//如果服务的实现类是GenericService的实现类,
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
//如果服务的实现类不是GenericService的实现类
try {
//根据配置的接口地址获取接口的Class对象
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
//检查远程服务接口和方法是否符合Dubbo的要求。主要检查配置文件中配置的方法是否包含在远程服务接口中
checkInterfaceAndMethods(interfaceClass, methods);
//reference引用不应该为null,并且是给定接口的实现,即配置中ref指向的Bean是interface的实现类
checkRef();
generic = Boolean.FALSE.toString();
}
//local指的是服务接口的本地impl类名
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
//获取实现类的Class,是对Class.forName()的一种替换
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
//stub 服务接口类的本地代理类名,用于在客户端执行本地逻辑,如本地缓存等,该本地代理类的构造函数必须允许传入远程代理对象。local 和 stub 在功能应该是一致的,用于配置本地存根
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
//获取代理类的Class,
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
//检查实现类和代理类是不是服务接口类的实现
checkStubAndLocal(interfaceClass);
//
checkMock(interfaceClass);
}
可以看到这里的逻辑主要就是对配置的读取和设置,和一些类关系的检查
2和3. 对服务导出相关的检查就是对配置的读取
这里的检查主要是对配置的获取
private boolean shouldExport() {
Boolean shouldExport = getExport();
if (shouldExport == null && provider != null) {
shouldExport = provider.getExport();
}
// default value is true
if (shouldExport == null) {
return true;
}
return shouldExport;
}
private boolean shouldDelay() {
Integer delay = getDelay();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return delay != null && delay > 0;
}
4.服务的导出
在 doExportUrls 方法中对多协议,多注册中心进行了支持。
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
private void doExportUrls() {
//1.加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
//遍历 protocols,并在每个协议下导出服务
for (ProtocolConfig protocolConfig : protocols) {
//拼接path,group跟version组成一个pathKey(serviceName)
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//将相关的信息封装比如:接口类的Class对象,实现类对象,pathKey(serviceName)跟暴露的服务方法methodsToExport
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
//将providerModel保存到providedServices中
ApplicationModel.initProviderModel(pathKey, providerModel);
//2.组装URL
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
4.1加载注册中心链接loadRegistries方法
这个方法作用是加载注册表并将其转换为URL类型,优先级顺序为:system property> dubbo registry config。话不多说,直接上代码
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
//遍历配置中心的配置列表
for (RegistryConfig config : registries) {
//获取配置中心地址
String address = config.getAddress();
//如果address是空的则用默认的0.0.0.0
if (StringUtils.isEmpty(address)) {
address = Constants.ANYHOST_VALUE;
}
//如果address是合法的;NO_AVAILABLE="N/A"
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 添加 ApplicationConfig 中的字段信息到 map 中
appendParameters(map, application);
// 添加 RegistryConfig 字段信息到 map 中
appendParameters(map, config);
// 添加 path到 map 中
map.put(Constants.PATH_KEY, RegistryService.class.getName());
//添加dubbo版本,release版本,系统时间timestamp和pid到map中
appendRuntimeParameters(map);
//如果没有定义protocol类型,则使用默认的dubbo协议
if (!map.containsKey(Constants.PROTOCOL_KEY)) {
map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);
}
// 解析得到 URL 列表,address 可能包含多个注册中心 ip,
// 因此解析得到的是一个 URL 列表
List<URL> urls = UrlUtils.parseURLs(address, map);
//便利URl
for (URL url : urls) {
// 将 URL 协议的protocol设置为 registry
url = URLBuilder.from(url)
.addParameter(Constants.REGISTRY_KEY, url.getProtocol())
.setProtocol(Constants.REGISTRY_PROTOCOL)
.build();
// 通过判断条件,决定是否添加 url 到 registryList 中,条件满足以下两个之一就可以:
// 1. (服务提供者 && register = true 或 null)
// 2. (非服务提供者 && subscribe = true 或 null)
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
4.2组装URL并导出服务
URL是Dubbo配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递。因此URL组装的过程比较复杂
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//获取协议的类型名
String name = protocolConfig.getName();
//没有则为默认的dubbo
if (StringUtils.isEmpty(name)) {
name = Constants.DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
//// 添加 side、版本、时间戳以及进程号等信息到 map 中,通过反射将对象的字段信息添加到 map 中
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
//将method封装到map中
appendParameters(map, method, method.getName());
//获取method中的retry属性
String retryKey = method.getName() + ".retry";
//如果map中包含retry,则移除,如果设置retry为false则设置retries属性为0
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
//如果方法的参数配置列表不为空
if (CollectionUtils.isNotEmpty(arguments)) {
//获取 ArgumentConfig 列表
for (ArgumentConfig argument : arguments) {
// 参数类型不是null并且不是空字符串
if (argument.getType() != null && argument.getType().length() > 0) {
//通过反射获取 interfaceClass 的方法列表
Method[] methods = interfaceClass.getMethods();
// 如果方法不为空
if (methods != null && methods.length > 0) {
//便利所有的方法
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 用反射得到的方法名对比MethodConfig中的method,查找目标方法,
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
//如果参数列表不是空
if (argument.getIndex() != -1) {
//根据获取的方法的参数类型跟ArgumentConfig中的参数类型比较,如果相同就将ArgumentConfig 字段信息到 map 中,或抛出异常
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// 如果存在多个回调方法
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 从参数类型列表中查找类型名称为 argument.type 的参数,然后封装
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
} else if (argument.getIndex() != -1) {
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//当前服务是不是基础服务类是的则添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
//
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
//为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
// 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroy
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//添加 token 到 map 中
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 获取注册中心的ip和服务注册的ip,并保存到map中
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// 获取服务注册的端口,并保存到map中
Integer port = this.findConfigedPorts(protocolConfig, name, map);
//根据map中封装的信息组装成一个URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,则什么都不做
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// scope != remote,导出到本地
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// scope != local,导出到远程
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//导出服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
4.2.1到处服务到本地
服务导出到本地的时候,即配置的scope不是remote的时候。
private void exportLocal(URL url) {
// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 创建 InjvmExporter
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
4.2.2服务导出前invoker的创建
这里引用Dubbo官方文档中的说明:Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
//为目标类创建 Wrapper,但是包装器无法正确处理此方案:类名包含'$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
//------Wrapper类
public static Wrapper getWrapper(Class<?> c) {
//如果是动态代理的类,则获取他的父类
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
//如果是Object类,则按照Object类来创建
if (c == Object.class) {
return OBJECT_WRAPPER;
}
//从缓存中获取 Wrapper 实例
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
// 缓存未命中,创建 Wrapper
ret = makeWrapper(c);
// 写入缓存
WRAPPER_MAP.put(c, ret);
}
return ret;
}
//----------Wrapper类
private static Wrapper makeWrapper(Class<?> c) {
//检测 c 是否为基本类型,若是则抛出异常
if (c.isPrimitive()) {
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
String name = c.getName();
ClassLoader cl = ClassHelper.getClassLoader(c);
// c1 用于存储 setPropertyValue 方法代码
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
// c2 用于存储 getPropertyValue 方法代码
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
// c3 用于存储 invokeMethod 方法代码
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
// 生成类型转换代码及异常捕捉代码,比如:
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
// pts 用于存储成员变量名和类型
Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
// ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
// mns 为方法名列表
List<String> mns = new ArrayList<>(); // method names.
//dmns 用于存储“定义在当前类中的方法”的名称
List<String> dmns = new ArrayList<>(); // declaring method names.
// 获取 public 访问级别的字段,并为所有字段生成条件判断语句
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
// 忽略关键字 static 或 transient 修饰的变量
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
continue;
}
// 生成条件判断及赋值语句,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
// 生成条件判断及返回语句,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
// 存储 <字段名, 字段类型> 键值对到 pts 中
pts.put(fn, ft);
}
Method[] methods = c.getMethods();
// 检测 c 中是否包含在当前类中声明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
// 忽略 Object 中定义的方法
if (m.getDeclaringClass() == Object.class) {
continue;
}
String mn = m.getName();
// 生成方法名判断语句,比如:
// if ( "sayHello".equals( $2 )
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
// && $3.length == 2
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for (Method m2 : methods) {
// 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
// 对重载方法进行处理,考虑下面的方法:
// 1. void sayHello(Integer, String)
// 2. void sayHello(Integer, Integer)
// 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
// 需要进一步判断方法的参数类型
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成参数类型进行检测代码,比如:
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
// 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
c3.append(" ) { ");
// 根据返回值类型生成目标方法调用语句
if (m.getReturnType() == Void.TYPE) {
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
} else {
// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
}
// 添加 }, 生成的代码形如(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
//
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
// return null;
// }
c3.append(" }");
// 添加方法名到 mns 集合中
mns.add(mn);
// 检测当前方法是否在 c 中被声明的
if (m.getDeclaringClass() == c) {
// 若是,则将当前方法名添加到 dmns 中
dmns.add(mn);
}
ms.put(ReflectUtils.getDesc(m), m);
}
// 添加异常捕捉语句
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
// 添加 NoSuchMethodException 异常抛出代码
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// 处理 get/set 方法
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = entry.getValue();
// 匹配以 get 开头的方法
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
// 获取属性名
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("name") ) { return ($w).w.getName(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 is/has/can 开头的方法
} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
// 生成属性判断以及返回语句,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 set 开头的方法
} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
// 生成属性判断以及 setter 调用语句,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
pts.put(pn, pt);
}
}
// 添加 NoSuchPropertyException 异常抛出代码
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
// 创建类生成器
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 设置类名及超类
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class);
// 添加默认构造方法
cc.addDefaultConstructor();
// 添加字段
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
// 添加方法代码
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
// 生成类
Class<?> wc = cc.toClass();
// 设置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
// 创建 Wrapper 实例
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
其中cc.toClass()这个方法是对Javassist包的toClass方法的封装。对于Javassist可以去网上搜索相关的说明。
4.2.3服务导出
export方法定义在Protocol接口中,实现类很多。直接进入到RegistryProtocol类中。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//从Invoker中获取注册中心的URl
URL registryUrl = getRegistryUrl(originInvoker);
// 从Invoker中服务提供的URL
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
//获取服务 URL,保存新的服务提供者url,此时可能会有覆盖的情况,订阅是带有服务名称的缓存密钥,它会导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 创建监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//在服务导出之前获取现有配置规则并覆盖提供者URL
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//1.导出服务--------------------
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 向服务提供者与消费者注册表中注册服务提供者
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//获取 register 参数
boolean register = registeredProviderUrl.getParameter("register", true);
// 根据 register 的值决定是否注册服务
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//设置注册中心的URL
exporter.setRegisterUrl(registeredProviderUrl);
//设置新的服务提供的URL
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
上面逻辑主要有以下几步:
1.从用配置的ref对象生成的Invoker中获取,注册中心的URL跟服务提供方的URL;
2.调用doLocalExport导出服务;
3.向注册中心注册服务;
4.向注册中心进行订阅 override 数据;
5.创建并返回 DestroyableExporter
1.进入到doLocalExport方法进行分析
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 创建 Invoker 为委托类对象
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 调用 protocol 的 export 方法导出服务
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
这里调用的export方法会根据使用的协议类型来进入不同的Protocol实现类进行调用,我们常用的是dubbo协议,我们就直接进入DubboProtocol类进行分析。逻辑比较复杂。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//从包装过的invoker中获取providerUrl
URL url = invoker.getUrl();
// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 创建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
// 保存相关的信息一份到本地内存中
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
//--------------
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
String key = url.getAddress();
//client can export a service which's only for server to invoke
//服务提供方是服务的时候才能够导出
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
// 创建服务器实例
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
//----------
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
//添加服务器关闭时发送readonly事件,默认情况下启用
.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
// 添加心跳检测配置到 url 中
.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
// 添加编码解码器参数
.addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
.build();
// 获取 server 参数,当没有指定的时候使用默认值默认为 netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter(传输层) 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
//----------Exchangers类
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
//------HeaderExchanger类
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//--------Transporters类
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, handler);
}
//-------由于默认的使用的是netty所以这里进入NettyTransporter,这个类有两个包分别是对netty不同版本的支持,我们选择对netty4.0支持的类
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
//创建NettyServer
return new NettyServer(url, listener);
}
//----最终调用父类的AbstractServer类的AbstractServer方法
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
//设置基本属性
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 设置 ip 为 0.0.0.0
bindIp = Constants.ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 调用模板方法 doOpen 启动服务器
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
//---------doOpen方法
protected void doOpen() throws Throwable {
// 创建 ServerBootstrap
bootstrap = new ServerBootstrap();
// 创建 boss 和 worker 线程池
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// 绑定到指定的 ip 和端口上
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
4.2.4服务注册
在RegistryProtocol的export方法中有一个 向注册中心注册服务的方法register
public void register(URL registryUrl, URL registeredProviderUrl) {
// 获取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registeredProviderUrl);
}
第一个方法getRegistry方法在AbstractRegistryFactory类中实现
public Registry getRegistry(URL url) {
//创建URL
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 访问缓存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 缓存未命中,创建 Registry 实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 写入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
其中createRegistry有很多实现,这里选择ZookeeperRegistryFactory
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
/**
* Invisible injection of zookeeper client via IOC/SPI
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
&esmp;进入ZookeeperRegistry方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//获取组名,如果为空则为默认值dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
//如果组名不是以“/”开头则需要加上
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
进入AbstractZookeeperTransporter类的connect方法
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
zookeeperClient = createZookeeperClient(toClientURL(url));
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
都比较容易理解就是创建zk的连接,先从缓存拿,拿不到就创建,然后继续查看CuratorZookeeperClient这个创建方法
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
也可以去官网查看解析
网友评论