本文将详细分析Dubbo的服务发布流程,建议结合文章Dubbo SPI 机制解析一起阅读。
在开始分析之前,有必须熟悉一下Dubbo源码的目录结构,以及各模块的功能。
模块说明:
dubbo-common 公共逻辑模块:包括 Util 类和通用模型。
dubbo-remoting 远程通讯模块:相当于 Dubbo 协议的实现,如果 RPC 用 RMI协议则不需要使用此包。
dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。
dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡, 容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。
dubbo-registry 注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。
dubbo-monitor 监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。
dubbo-config 配置模块:是 Dubbo 对外的 API,用户通过 Config 使用Dubbo,隐藏 Dubbo 所有细节。
dubbo-container 容器模块:是一个 Standlone 的容器,以简单的 Main 加载 Spring 启动,因为服务通常不需要 Tomcat/JBoss 等 Web 容器的特性,没必要用 Web 容器去加载服务。
Spring 对外留出的扩展
dubbo是基于spring 配置来实现服务的发布的,那么一定是基于spring的扩展来写了一套自己的标签。在dubbo配置文件中看到的<dubbo:service> ,就是属于自定义扩展标签。
要实现自定义扩展,有三个步骤(在spring中定义了两个接口,用来实现扩展)
1.NamespaceHandler: 注册一堆BeanDefinitionParser,利用他们来进行解析
2.BeanDefinitionParser:用于解析每个element的内容
3.Spring默认会加载jar包下的META-INF/spring.handlers文件寻找对应的NamespaceHandler。
以下是Dubbo-config模块下dubbo-config-spring的配置:
也就是说会通过DubboNamespaceHandler去解析dubbo自定义的标签。DubboBeanDefinitionParser用于把不同的配置分别转化成spring容器中的bean对象。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
为了在spring启动的时候,也相应的启动了发布服务和注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean ServiceBean、ReferenceBean。分别继承了ServiceConfig和ReferenceConfig。并分别实现了InitializingBean、DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware。
InitializingBean为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
DisposableBean bean被销毁的时候,spring容器会自动执行destory方法,比如释放资源。
ApplicationContextAware 实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来。
ApplicationListener ApplicationEvent事件监听,spring容器启动后会发一个事件通知。
BeanNameAware 获得自身初始化时,本身的bean的id属性。
由此可以看出,Dubbo 的服务发布流程的实现思路是:
1.利用spring的解析收集xml中的配置信息,然后把这些配置信息存储到serviceConfig中。
2.调用ServiceConfig的export方法来进行服务的发布和注册。
ServiceConfig的export
delay的使用
export()中有一个delay参数,用于判断服务是否需要延迟加载。而延迟的方式也很直截了当,Thread.sleep(delay)。
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
doExportUrls
export方法先判断是否需要延迟暴露,然后执行doExport方法。doExport方法先执行一系列的检查方法,然后调用doExportUrls方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。
@SuppressWarnings({ "unchecked", "rawtypes" })
private void doExportUrls() {
//是不是获得注册中心的配置
List<URL> registryURLs = loadRegistries(true);
//是不是支持多协议发布
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrls方法先调用loadRegistries获取所有的注册中心url,然后遍历调用doExportUrlsFor1Protocol方法。对于在标签中指定了registry属性的Bean,会在加载BeanDefinition的时候就加载了注册中心。
获取注册中心url,会把注册的信息都放在一个URL对象中,URL内容如下:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&organization=&owner=&pid=2939®istry=zookeeper×tamp=1488898049284
如果scope没有配置或者配置local、remote,dubbo会将服务export到本地,URL内容如下:
dubbo://192.168.1.100:20880/dubbo.common.hello.service.HelloService?anyhost=true&application=dubbo-provider&application.version=1.0&delay=5000&dubbo=2.5.3&environment=product&interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=&owner=&pid=2939&side=provider×tamp=1488898464953
发布服务
//通过proxyFactory来获取Invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//注册服务
Exporter<?> exporter = protocol.export(invoker);
这个地方可以做一个小结:
1.Invoker – 执行具体的远程调用
2.Protocol – 服务地址的发布和订阅
3.Exporter – 暴露服务或取消暴露
Protocol 的变化过程
第一步,在ServiceConfig中有一个静态变量protocol,通过Dubbo SPI 机制解析一文可知,此时protocol是Protocol$Adaptive。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
第二步,调用Protocol$Adaptive中的export方法
public Exporter export(Invoker arg0) throws Invoker {
if (arg0 == null) throw new IllegalArgumentException("Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("Invoker argument getUrl() == null");URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(Protocol) name from url(" + url.toString() + ") use keys([protocol])");
Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(arg0);
}
上面这段代码做两个事情
1.从url中获得protocol的协议地址,如果protocol为空,表示已dubbo协议发布服务,否则根据配置的协议类型来发布服务。
2.调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName)。
getExtension()这个方法的主要作用是用来获取ExtensionLoader实例代表的扩展的指定实现,首先从cachedInstances缓存中查找是否已经创建了实例,如果没有调用createExtension()。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//对获取的的和实例进行依赖注入
injectExtension(instance);
//cachedWrapperClasses是在loadFile中进行赋值的
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
// 对实例进行包装,分别调用带Protocol参数的构造函数创建实例,然后进行依赖注入。
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
由此可以看出,当我们传入的值是registry时,得到的是RegistryProtocol;
当我们传入的值是dubbo时,会通过Wrapper对Protocol进行装饰,装饰器为ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper(ProtocolListenerWrapper(DubboProtocol))。
ProtocolFilterWrapper
这个类非常重要,dubbo机制里面日志记录、超时等等功能都是在这一部分实现的。这个类有3个特点,
1.它有一个参数为Protocol protocol的构造函数;
2.它实现了Protocol接口;
3.它使用责任链模式,对export和refer函数进行了封装。
ProtocolFilterWrapper会根据条件获取当前扩展可自动激活的实现,主要包括以下实现类:
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
这其中涉及到很多功能,包括权限验证、异常、超时、计算调用时间等都在这些类实现。
DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
openServer(url);
return exporter;
}
openServer开启服务:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
createServer创建服务,开启心跳检测,默认使用netty
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
调用Exchangers.bind()方法,Exchanger是扩展点,此时会自适应加载默认扩展点HeaderExchanger。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
调用Transporters.bind()方法,Transporter也是扩展点,此时会自适应加载默认扩展点NettyTransporter。通过NettyTranport创建基于Netty的server服务。
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
RegistryProtocol.export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
//得到需要注册到zk上的协议地址,也就是dubbo://
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
getRegistry()方法是invoker的地址获取registry实例
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl(); //registry://
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
//得到zookeeper的协议地址
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}///registryUrl就会变成了zookeeper://
return registryFactory.getRegistry(registryUrl);
}
通过前面这段代码的分析,其实就是把registry的协议头改成服务提供者配置的协议地址,也就是我们配置的
<dubbo:registry address=”zookeeper://ip:port”/>,然后registryFactory.getRegistry的目的,就是通过协议地址匹配到对应的注册中心。
RegistryFactory是扩展点,此时会自适应加载默认扩展点ZookeeperRegistryFactory,ZookeeperRegistryFactory中并没有getRegistry方法,而是在父类AbstractRegistryFactory。AbstractRegistryFactory首先从缓存REGISTRIES中,根据key获得对应的Registry。如果没有则调用子类,创建一个注册中心。
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
代码分析到这里,我们对于getRegistry得出了一个结论,根据当前注册中心的配置信息,获得一个匹配的注册中心,也就是ZookeeperRegistry。接下来会调用registry.register(registedProviderUrl),将dubbo://的协议地址注册到zookeeper上。
因为ZookeeperRegistry这个类中并没有register这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的register方法。
FailbackRegistry.register
FailbackRegistry,从名字上来看,是一个失败重试机制。FailbackRegistry.register会先调用父类的register方法,将当前url添加到缓存集合中。然后调用调用doRegister方法,是一个抽象方法,会由ZookeeperRegistry子类实现。
ZookeeperRegistry.doRegister创建节点
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
Invoker
前面花了很大的篇幅讲解protocol.export()方法,下面将讲解Invoker的实现过程。
/**
* 暴露远程服务:
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
服务实现类转换成Invoker,大概的步骤是:
根据proxyFactory方法调用具体的ProxyFactory实现类的getInvoker方法获取Invoker。
getInvoker的过程是,首先对实现类做一个包装,生成一个包装后的类。
然后新创建一个Invoker实例,这个Invoker中包含着生成的Wrapper类,Wrapper类中有具体的实现类。
第一步,在ServiceConfig中有一个静态变量proxyFactory,是一个自适应扩展点。
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
使用JavassistProxyFactory获取Invoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
//第一步封装一个Wrapper类
//该类是手动生成的
//如果类是以$开头,就使用接口类型获取,其他的使用实现类获取
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
//关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
使用JdkProxyFactory获取invoker
JdkProxyFactory的getInvoker方法,直接返回一个AbstractProxyInvoker实例,没有做处理,只是使用反射调用具体的方法。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
总结
加载dubbo配置文件,解析生成ServiceConfig,进行服务发布。重点关注ServiceConfig里的两个成员变量protocol和proxyFactory。
1、首先将服务的实现封装成一个Invoker,Invoker中封装了服务的实现类。
2、将Invoker封装成Exporter,并缓存起来,缓存里使用Invoker的url作为key。
3、服务端Server启动,监听端口。(请求来到时,根据请求信息生成key,到缓存查找Exporter,就找到了Invoker,就可以完成调用。)
网友评论