概述
最近做了个服务化项目,基于dubbo项目的,所以抽空把原本想看dubbo源码的愿望给实现了,等把dubbo的事情了解了再继续去完善flume部分的源码阅读。
这一章主要是讲dubbo服务发布的过程,其实dubbo服务的发布我认为其实有三个阶段:阶段一是指解析xml配置文件;阶段二是spring如何生成bean对象;阶段三是实现bean的服务发布。其中阶段二由于涉及到spring本身的bean初始化,这块暂时没有能力去讲解清楚。所以努力将其他两个阶段讲解清楚。
参考了很多其他人的文章,先把概念性的东西最先写出来,结合着后面的具体过程分析会比较好理解,这里共享了《益文的圈》的图片,特意备注出来。
基础概念普及
Invoker
public interface Invoker<T> extends Node {
Class<T> getInterface();
Result invoke(Invocation invocation) throws RpcException;
}
可执行对象的抽象,能够根据方法的名称、参数得到相应的执行结果。
Invoker可分为三类:
-
AbstractProxyInvoker:本地执行类的Invoker,实际通过Java反射的方式执行原始对象的方法。
-
AbstractInvoker:远程通信类的Invoker,实际通过通信协议发起远程调用请求并接收响应。
-
AbstractClusterInvoker:多个远程通信类的Invoker聚合成的集群版Invoker,加入了集群容错和负载均衡策略。
invoker
Invocation:包含了需要执行的方法和参数等重要信息,他有两个实现类RpcInvocation和MockInvocation。
ProxyFactory
public interface ProxyFactory {
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
服务接口代理抽象,用于生成一个接口的代理类。
- getInvoker方法:针对Server端,将服务对象(如DemoServiceImpl)包装成一个Invoker对象。
- getProxy方法:针对Client端,创建接口(如DemoService)的代理对象。
Exporter
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
维护Invoker的生命周期,内部包含Invoker或者ExporterMap。
Protocol
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
协议抽象接口。封装RPC调用。
- exporter方法:暴露远程服务(用于服务端),就是将Invoker对象通过协议暴露给外部。
- refer方法:引用远程服务(用于客户端),通过Clazz、url等信息创建远程的动态代理Invoker。
关系图
关系图- ServiceConfig包含ProxyFactory和Protocol,通过SPI的方式注入生成;
- ProxyFactory负责创建Invoker;
- Protocol负责通过Invoker生成Exporter,将服务启动并暴露;
dubbo的注册中心地址
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
application=demo-provider&dubbo=2.0.0&pid=1908®istry=zookeeper×tamp=1527741226496
dubbo的服务发布地址
dubbo://172.17.32.44:20880/com.alibaba.dubbo.demo.DemoService?
anyhost=true&application=demo-provider&bind.ip=172.17.32.44&bind.port=20880&dubbo=2.0.0&generic=false&
interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1908&side=provider×tamp=1527741226515
XML配置解析
dubbo的demo中的基本XML配置如下,基本上我们可以看到核心的几个配置包括dubbo:application、dubbo:registry、dubbo:protocol、dubbo:service,我们的解析过程就是对这个XML文件进行了解析。
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<!-- use multicast registry center to export service -->
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20881"/>
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" delay="-1"/>
</beans>
XML配置解析的过程主要是解析XML文件并生成BeanDefinition,这个BeanDefinition就是用于spring初始化bean实例的对象。DubboNamespaceHandler主要用于注册各个模型的解析类,其中的application、module、provider、consumer、service等都是dubbo的schema关键字。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
DubboBeanDefinitionParser类作为解析XML的核心,通过parse方法返回BeanDefinition对象。这里Element和beanClass是一一对应的,解析dubbo:application标签对应的beanClass为ApplicationConfig.class,解析dubbo:service标签对应的beanClass为ServiceBean.class。下面代码里面标准了step_x用来解析每一步具体执行的内容。当然这里需要考虑每个类的继承关系,因为子类会继承父类的的方法。
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
//step_1 创建RootBeanDefinition对象并设置对应的beanClass。
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
//step_2 生成id的逻辑集中在这块
String id = element.getAttribute("id");
if ((id == null || id.length() == 0) && required) {
String generatedBeanName = element.getAttribute("name");
if (generatedBeanName == null || generatedBeanName.length() == 0) {
if (ProtocolConfig.class.equals(beanClass)) {
generatedBeanName = "dubbo";
} else {
generatedBeanName = element.getAttribute("interface");
}
}
if (generatedBeanName == null || generatedBeanName.length() == 0) {
generatedBeanName = beanClass.getName();
}
id = generatedBeanName;
int counter = 2;
while (parserContext.getRegistry().containsBeanDefinition(id)) {
id = generatedBeanName + (counter++);
}
}
if (id != null && id.length() > 0) {
if (parserContext.getRegistry().containsBeanDefinition(id)) {
throw new IllegalStateException("Duplicate spring bean id " + id);
}
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
beanDefinition.getPropertyValues().addPropertyValue("id", id);
}
//step_3 针对不同的beanClass做特殊字段的解析
if (ProtocolConfig.class.equals(beanClass)) {
for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
if (property != null) {
Object value = property.getValue();
if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
}
}
}
} else if (ServiceBean.class.equals(beanClass)) {
String className = element.getAttribute("class");
if (className != null && className.length() > 0) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
} else if (ProviderConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
} else if (ConsumerConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
}
//step_4 针对beanClass通过反射获取所有的set方法获取所有需要解析的属性放置在Set<String> props当中。
//beanDefinition.getPropertyValues().addPropertyValue将所有参数保存到beanDefinition当中。
Set<String> props = new HashSet<String>();
ManagedMap parameters = null;
for (Method setter : beanClass.getMethods()) {
String name = setter.getName();
if (name.length() > 3 && name.startsWith("set")
&& Modifier.isPublic(setter.getModifiers())
&& setter.getParameterTypes().length == 1) {
Class<?> type = setter.getParameterTypes()[0];
String property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), "-");
props.add(property);
Method getter = null;
try {
getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e) {
try {
getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e2) {
}
}
if (getter == null
|| !Modifier.isPublic(getter.getModifiers())
|| !type.equals(getter.getReturnType())) {
continue;
}
if ("parameters".equals(property)) {
parameters = parseParameters(element.getChildNodes(), beanDefinition);
} else if ("methods".equals(property)) {
parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
} else if ("arguments".equals(property)) {
parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
} else {
String value = element.getAttribute(property);
if (value != null) {
value = value.trim();
if (value.length() > 0) {
if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
beanDefinition.getPropertyValues().addPropertyValue(property, registryConfig);
} else if ("registry".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("registries", value, beanDefinition, parserContext);
} else if ("provider".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("providers", value, beanDefinition, parserContext);
} else if ("protocol".equals(property) && value.indexOf(',') != -1) {
parseMultiRef("protocols", value, beanDefinition, parserContext);
} else {
Object reference;
if (isPrimitive(type)) {
if ("async".equals(property) && "false".equals(value)
|| "timeout".equals(property) && "0".equals(value)
|| "delay".equals(property) && "0".equals(value)
|| "version".equals(property) && "0.0.0".equals(value)
|| "stat".equals(property) && "-1".equals(value)
|| "reliable".equals(property) && "false".equals(value)) {
// backward compatibility for the default value in old version's xsd
value = null;
}
reference = value;
} else if ("protocol".equals(property)
&& ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(value)
&& (!parserContext.getRegistry().containsBeanDefinition(value)
|| !ProtocolConfig.class.getName().equals(parserContext.getRegistry().getBeanDefinition(value).getBeanClassName()))) {
if ("dubbo:provider".equals(element.getTagName())) {
logger.warn("Recommended replace <dubbo:provider protocol=\"" + value + "\" ... /> to <dubbo:protocol name=\"" + value + "\" ... />");
}
// backward compatibility
ProtocolConfig protocol = new ProtocolConfig();
protocol.setName(value);
reference = protocol;
} else if ("onreturn".equals(property)) {
int index = value.lastIndexOf(".");
String returnRef = value.substring(0, index);
String returnMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(returnRef);
beanDefinition.getPropertyValues().addPropertyValue("onreturnMethod", returnMethod);
} else if ("onthrow".equals(property)) {
int index = value.lastIndexOf(".");
String throwRef = value.substring(0, index);
String throwMethod = value.substring(index + 1);
reference = new RuntimeBeanReference(throwRef);
beanDefinition.getPropertyValues().addPropertyValue("onthrowMethod", throwMethod);
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
}
}
reference = new RuntimeBeanReference(value);
}
beanDefinition.getPropertyValues().addPropertyValue(property, reference);
}
}
}
}
}
}
NamedNodeMap attributes = element.getAttributes();
int len = attributes.getLength();
for (int i = 0; i < len; i++) {
Node node = attributes.item(i);
String name = node.getLocalName();
if (!props.contains(name)) {
if (parameters == null) {
parameters = new ManagedMap();
}
String value = node.getNodeValue();
parameters.put(name, new TypedStringValue(value, String.class));
}
}
if (parameters != null) {
beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
}
return beanDefinition;
}
bean初始化过程
整个bean的实例化是由spring内部实现的(暂时未来得及分析其中过程),但是实例化后收尾的初始化工作其实是在dubbo当中完成的,我们就暂时忽略其中的spring初始化bean的过程直接分析后面的的过程。service的发布过程集中在ServiceBean类当中。这里我们关注onApplicationEvent()和afterPropertiesSet()方法。这里先补充下bean的初始化过程。
1、启动容器
2、调用BeanFactoryPostProcessor的postProcessBeanFactory()方法进行工厂后处理
3、getBean()调用某一个Bean
4、调用InstantiationAwareBeanPostProcessor 的postProcessBeforeInstantiation()方法。
5、实例化Bean(构造函数、工厂方法)
6、调用InstantiationAwareBeanPostProcessor 的postProcessAfterInstantiation()方法。
7、调用InstantiationAwareBeanPostProcessor 的postProcessPropertyValues()方法。
8、设置Bean的属性值
9、调用BeanNameAware的setBeanName()方法
10、调用BeanFactoryAware的setBeanFactory()方法
11、调用ApplicationContextAware的setApplicationContext()方法
12、调用BeanPostProcessor的postProcessBeforeInitialization()方法
13、调用InitializingBean的afterPropertiesSet()方法
14、调用init-method初始化方法
15、调用BeanPostProcessor的postProcessAfterInitialization()方法
16、如果singleton则缓存bean,如果prototype则返回bean。
17、如果singleton则在容器销毁的时候调用DisposableBean的destroy()方法
18、如果singleton则在容器销毁的时候调用destory-method方法
跟进ServiceBean.java类,主要是关注onApplicationEvent和afterPropertiesSet和setApplicationContext方法,其中setApplicationContext方法要早于afterPropertiesSet方法执行。
setApplicationContext负责把spring的上下文注入到当前bean当中,afterPropertiesSet负责初始化该bean,onApplicationEvent负责把bean发布成service。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
private final transient Service service;
private transient ApplicationContext applicationContext;
private transient String beanName;
public ServiceBean() {
super();
this.service = null;
}
public void setApplicationContext(ApplicationContext applicationContext) {
}
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//todo 这里的核心初始化bean后的动作,会进入ServiceConfig当中去执行
export();
}
}
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
}
return supportedApplicationListener && (delay == null || delay == -1);
}
@SuppressWarnings({"unchecked", "deprecation"})
public void afterPropertiesSet() throws Exception {
}
public void destroy() throws Exception {
unexport();
}
}
ServiceBean的初始化过程关键在于afterPropertiesSet,内部主要获取provider、application、module、registries、monitor、protocols、path等对象并注入到bean当中,整个过去方法其实就是从spring的上下文中获取对象,BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false)方法就是用来实现这个功能的。
代码末尾的export其实本身是没有执行的,真正执行export是在onApplicationEvent当中实现的,所以我们后面需要跟踪核心的export过程。
public void afterPropertiesSet() throws Exception {
//step_1 获取provider设置到bean当中
if (getProvider() == null) {
//todo 从上下文获取是否有对应的对象
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // backward compatibility
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (providerConfigs.size() > 0) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
//step_2 获取application设置到bean当中
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
//step_3 获取module设置到bean当中
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
//step_4 获取registries设置到bean当中
if ((getRegistries() == null || getRegistries().size() == 0)
&& (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
registryConfigs.add(config);
}
}
if (registryConfigs != null && registryConfigs.size() > 0) {
super.setRegistries(registryConfigs);
}
}
}
//step_5 获取monitor设置到bean当中
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
//step_6 获取protocols设置到bean当中
if ((getProtocols() == null || getProtocols().size() == 0)
&& (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
for (ProtocolConfig config : protocolConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
protocolConfigs.add(config);
}
}
if (protocolConfigs != null && protocolConfigs.size() > 0) {
super.setProtocols(protocolConfigs);
}
}
}
//step_7 获取path设置到bean当中
if (getPath() == null || getPath().length() == 0) {
if (beanName != null && beanName.length() > 0
&& getInterface() != null && getInterface().length() > 0
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
//todo 如果不是延迟暴露就会立即export,否则应该会等到onApplicationEvent中去执行暴露
if (!isDelay()) {
export();
}
}
bean暴露过程
整个服务的暴露过程如下图,共享自《益文的圈》,这里的指假设我们没有指定本地发布或远程发布,那么就会执行两个发布,暂时只关注远程发布。
首先我们需要了解下ServiceBean的类继承关系,ServiceBean的onApplicationEvent方法中export()因为我们刚刚说的export方法其实就是在ServiceConfig类当中的。后面我们会就ServiceConfig类的export()方法实现跟踪。
ServiceBean的类图执行ServiceConfig类的doExport方法的核心逻辑,doExport -> doExportUrls -> doExportUrlsFor1Protocol,这里需要关注registryURLs的生成方法loadRegistries,也就是说我们的注册中心的url地址是如何生成的。
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
注册中心URL生成过程主要集中在loadRegistries方法中,整个思路就是遍历所有dubbo:registry标签生成的registries列表,针对每个registry结合application+registry自身的config最后生成注册中心的URL。
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
核心重点来了,doExportUrlsFor1Protocol部分主要是负责导出local和remote的dubbo服务,暂时关注导出remote服务,整体步骤是:
- 1、 proxyFactory生成invoker,这里factory为JavassistProxyFactory。(dubug跟进下)
- 2、由protocol导出为Exporter,这里protocol为RegistryProtocol。(dubug跟进下)
由于整个暴露过程还是比较重要的,所以继续深入进行分析。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略一些不重要的代码
String scope = url.getParameter(Constants.SCOPE_KEY);
//todo scope值为null,所以会发布到local
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//发布local的export
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope值为null,所以会发布到remote
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
// todo 核心重点
//step_1 proxyFactory生成invoker
//step_2 生成wrapperInvoker
//step_3 由protocol导出为Exporter
//step_4 加入到exporters当中
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
首先proxyFactory在这里其实由dubbo的扩展机制生成的,内部实现是通过javassit生成的源码。proxyFactory负责生产invoker,Protocol负责到处Export对象。
//step_1 proxyFactory生成invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//step_2 生成wrapperInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//step_3 由protocol导出为Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
proxyFactory.getInvoker()的调用链路如下,proxyFactory其实代表的是ProxyFactory$Adaptive对象。
Invoker<?> invoker = proxyFactory.getInvoker()
等同于ProxyFactory$Adaptive.getInvoker(ref, interfaceClass, dubboUrl)
内部执行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
返回StubProxyFactoryWrapper
extension.getInvoker(arg0, arg1, arg2)
等同于StubProxyFactoryWrapper.getInvoker(arg0, arg1, arg2)
内部执行javassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
返回通过new AbstractProxyInvoker()创建的对象
ProxyFactory$Adaptive的源码如下,服务暴露需要关注getInvoker()方法。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
JavassistProxyFactory的代码中我们需要关注getInvoker方法,内部其实就是通过wrapper.invokeMethod()方法实现调用。
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
protocol.export()调用链如下,
protocol.export() 等同 Protocol$Adaptive.export()
执行 extension = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
返回 ProtocolListenerWrapper
执行 extension.export(arg0) 等同 ProtocolListenerWrapper.export(Invoker<T> invoker)
执行 ProtocolFilterWrapper.export(Invoker<T> invoker)
执行RegistryProtocol.export()
(1)执行doLocalExport()将服务在本地开启端口监听之类并返回exporter
执行Protocol$Adaptive.export()
ProtocolFilterWrapper.export() -> ProtocolListenerWrapper.export() -> DubboProtocol.export() -> openServer开启服务监听
(2)执行register()方法将服务发布将服务注册到zookeeper上
Protocol$Adaptive的源码如下,服务暴露需要关注export()方法。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
DubboProtocol的核心export部分代码如下,核心事情包括openServer开启监听;创建exporter对象并返回。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//todo 核心在于打开socket连接
openServer(url);
optimizeSerialization(url);
return exporter;
}
整体流程
服务暴露整体流程参考知识点
spring bean初始化过程
bean生成过程bean初始化顺序
网友评论