本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
参考较多的博客:Dubbo中暴露服务的过程解析
文章内容顺序:
1.延迟暴露介绍,并引出方法入口
- 1.1服务暴露入口ServiceConfig#export()
- 1.2服务暴露的流程图解
2.服务暴露
- 2.1本地暴露的调用链路
- 2.2ServiceConfig#doExportUrls()
- 2.3ServiceConfig#doExportUrlsFor1Protocol(protocolConfig, registryURLs)
3.开始本地暴露——exportLocal(url)
4.Invoker 创建——proxyFactory扩展点的实现
- 4.1proxyFactory的包装类StubProxyFactoryWrapper
- 4.2扩展类JdkProxyFactory的getInvoker(T proxy, Class<T> type, URL url)
- 4.3实现了Invoker接口的AbstractProxyInvoker
- 4.4扩展类JavassistProxyFactory的getInvoker(T proxy, Class<T> type, URL url)
5.导出服务到本地——protocol扩展点的实现
- 5.1ProtocolFilterWrapper#export(invoekr)
- 5.2ProtocolListenerWrapper#export(invoekr)
- 5.3InjvmProtocol.export(invoker)
- 5.4ProtocolListenerWrapper的export方法的返回值
6.远程暴露
- 6.1远程暴露的doExportUrlsFor1Protocol方法代码逻辑
- 6.2protocol扩展点包装类的简介
- 6.3RegistrProtocol中注入的DubboProtocol
- 6.4RegistrProtocol#export(Invoker)
- 6.5RegistrProtocol的export调用了doLocalExport
- 6.6RegistrProtocol的doLocalExport调用了DubboProtocol#export(invoker)
- 6.7DubboProtoco的export调用了openServer(url)
- 6.8如果server不存在,还需要createServer()
7名词解释
1.延迟暴露
Dubbo中服务先不谈远程还是本地暴露,还有一种玩法是延迟暴露:
dubbo service默认是在容器启动的时候暴露的,暴露之后,消费者就可以调用服务提供者,但是如果此时服务提供者需要一定的时间初始化一些资源,那么就可以选择延迟暴露。比如在容器启动之后延迟10S再暴露。
dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay="5000"),另外一种是没有设置延迟暴露或者延迟设置为-1(delay="-1"):
- 设置了延迟暴露,dubbo在Spring实例化bean(initializeBean)的时候会对实现了
InitializingBean
的类进行回调,回调方法是afterPropertiesSet()
,如果设置了延迟暴露,dubbo在这个方法中进行服务的发布。- 没有设置延迟或者延迟为-1,dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了
ApplicationListener
的类进行回调onApplicationEvent()
,dubbo会在这个方法中发布服务。- 但是不管延迟与否,都是使用
ServiceConfig#export()
方法进行服务的暴露。使用export初始化的时候会将Bean对象转换成URL格式,所有Bean属性转换成URL的参数。
1.1服务暴露入口ServiceConfig#export()
来看看ServiceConfig#export()
方法的代码,这个方法就是我们整个服务暴露的入口代码了。
public synchronized void export() {
// 当 export 或者 delay 未配置,从 ProviderConfig 对象读取。
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
// 不暴露服务( export = false ) ,则不进行暴露服务逻辑。
if (export != null && !export) {
return;
}
// 延迟暴露
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
// 立即暴露
} else {
doExport();
}
}
可以看到方法借助了线程池来实现了延迟暴露,最后调用doExport方法先执行一系列的检查方法。
在
doExport()
中会会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。最后调用doExportUrls()
方法。在下面我们会从doExportUrls()
开始分析。
1.2服务暴露的流程图解
首先我们通过一个图来有个大概的概念,服务提供者暴露服务的主过程分为两大部分,第一部将持有的服务实例通过代理转换为Invoker,第二步把Invoker通过具体的协议(比如dubbo)转换为Exporter。如下图
image.png
首先要明确的一点:会在不配置 scope
(一个xml的配置项)的情况下,默认两种方式都暴露(本地和远程)。因为,Dubbo 自身无法确认应用中,是否存在本地引用的情况。
2.服务暴露
2.1本地暴露的调用链路
看完上面的图后再来看这个调用链路就好理解了多。(这个例子是本地暴露的链路,远程暴露的前面步骤与本地暴露没有太大区别,在
image.pngdoExportUrlsFor1Protocol()
中才会分开来)
2.2ServiceConfig#doExportUrls()
接下来我们就来看doExportUrls()
啦,以本地暴露为例:
private void doExportUrls() {
// 加载注册中心 URL 数组
List<URL> registryURLs = loadRegistries(true);
// 循环 `protocols` ,向不同的协议暴露暴露服务。
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
先来看看这第一个方法,Dubbo是支持多个注册中心的,所以需要数组存储注册中心的URL,同时也支持相同服务暴露多个协议,protocols存储的就是配置的不同的协议。真实的服务暴露逻辑在doExportUrlsFor1Protocol中。那我们就来看下doExportUrlsFor1Protocol。
2.3ServiceConfig#doExportUrlsFor1Protocol(protocolConfig, registryURLs)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// ... 【省略】 创建服务 URL 对象
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 服务本地暴露
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 服务远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 获得监控中心 URL
URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
} else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.io/books/dubbo-user-book/demos/explicit-target.html 。主要用于开发测试环境使用。
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
暴露服务,首先要让人知道的是你这个服务的ip和端口,这样引用者才知道怎么连接,连上服务器后,我们在考虑调用哪个服务模块的有哪些方法可以让引用者调用,引用才能知道怎么发起这个调用。所以我们在暴露服务的时候,IP,端口,服务名称,方法名称,是要放入到注册中心的(本地暴露的话注册中心就相当于本地内存)。因为最终服务的引用者是和注册中心进行交互的,获取信息的。那么确实dubbo也是这么做的,它将所有的数据都放到自定义的URL的实例中,然后,将注册中心的信息和URL的信息整合得到一起,到注册中心进行暴露服务。方法上面省略的部分就是在做URL的拼接工作。
3.开始本地暴露——exportLocal(url)
这是exportLocal(url)传的样例url,注意他的开头是dubbo:
dubbo://192.168.1.102:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.102&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5840&qos.port=22222&side=provider×tamp=1594401970747
先来看本地暴露的代码
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
// 创建本地 Dubbo URL,注意这里的setHost,已经设置成本地暴露了
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // injvm
.setHost(LOCALHOST) // 本地
.setPort(0); // 端口=0
// 添加服务的真实类名,例如 DemoServiceImpl ,仅用于 RestProtocol 中。
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 使用 ProxyFactory 创建 Invoker 对象
// 使用 Protocol 暴露 Invoker 对象
//proxyFactory在这里如果没有指定的话,由于SPI的自动注入机制,
//就是Javassist 代理工厂实现类,通过动态代理生成的Invoker
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
// 添加到 `exporters`
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
调用
ProxyFactory#getInvoker(proxy, type, url)
方法,创建 Invoker 对象。该 Invoker 对象,执行#invoke(invocation)
方法时,内部会调用 Service 对象( ref )对应的调用方法。调用
Protocol#export(invoker)
方法,暴露服务。
此处 Dubbo SPI 自适应的特性的好处就出来了,可以自动根据 URL 参数(注意方法内部已经对URL进行了修改),获得对应的拓展实现。例如,invoker 传入后,根据 invoker.url 自动获得对应 Protocol 拓展实现为 InjvmProtocol 。
实际上,Protocol 有两个 Wrapper 拓展实现类: ProtocolFilterWrapper、ProtocolListenerWrapper 。从上一篇是SPI机制我们知道,#export(...) 方法的调用顺序是:Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol
这里要注意的是:exporters仅仅是一个exporter的容器,主要用来执行unexport(),也就是可以理解为管理export(也就是管理invoker)的生命周期,而protocol.export(invoker)
才是把invoker暴露并且加到本地内存中的方法。他会将Invoker放到AbstractProtocol的exporterMap中,这是一个concurrentHashMap,以此来进行本地暴露。
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程。
4.Invoker 创建——proxyFactory扩展点的实现
在上面介绍的方法中调用了proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
来生成Invoker。proxyFactory的UML图如下
调用链路如下图所示。
image.png
ProxyFactory同样也是个扩展类,有Stub包装类,还有两个扩展类实现,可以自由决定用哪个Factory。Invoker通过Javassist动态代理或者JDK动态代理。
而技术的选型可以看下这张图作者的解释
image.png
4.1proxyFactory的包装类StubProxyFactoryWrapper
- ProxyFactory#getProxy(invoker)`链路其实是这样的,StubProxyFactoryWrapper的作用就是生成本地存根,在服务暴露中他并不会起作用,因为他是给消费者用的,所以会直接调用具体的proxyFactory实现。所以在此不做介绍,会在服务引用中详细说明。
private <T> Exporter<T> export(T instance, Class<T> type, URL url) {
return protocol.export(proxyFactory.getInvoker(instance, type, url));
}
4.2扩展类JdkProxyFactory的getInvoker(T proxy, Class<T> type, URL url)
注意:JdkProxyFactory
并非是proxyFactory
的默认实现,默认实现是JavassistProxyFactory
,只是JdkProxyFactory
更容易理解,先介绍这个,原理是一样的。
public class JdkProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 获得方法
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
// 调用方法
return method.invoke(proxy, arguments);
}
};
}
}
首先我们要明确,
getInvoker
的目的,是返回一个包裹了service类的Invoker类,这个Invoker类还需要有能执行service方法的方法。同时我们service方法是不确定的(比如有sayHello(),有sayGoodBye()等这种方法,需要通过url来知道我们调用的到底是哪个方法),所以最容易想到的办法便是在一个Invoker类类中使用反射来调用service方法。
可以看到,getInvoker
直接返回了一个匿名类,重写了doInvoke()方法,当一个Invoker类调用doInvoke()时,他就会调用反射,来执行service方法。通过这种方法我们就可以将各种各样的service都放到Invoker里。
试想一下,如果不用反射,怎么去调用service类中各种各样的方法名呢?(那就是另一个实现类的方法,用字节码技术来实现了)
4.3实现了Invoker接口的AbstractProxyInvoker
我们来简单看一下这个返回的AbstractProxyInvoker
是什么
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
/**
* 代理的对象,一般是 Service 实现对象
*/
private final T proxy;
/**
* 接口类型,一般是 Service 接口
*/
private final Class<T> type;
/**
* URL 对象,一般是暴露服务的 URL 对象
*/
private final URL url;
public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
if (proxy == null) {
throw new IllegalArgumentException("proxy == null");
}
if (type == null) {
throw new IllegalArgumentException("interface == null");
}
if (!type.isInstance(proxy)) { //
throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
}
this.proxy = proxy;
this.type = type;
this.url = url;
}
public Class<T> getInterface() {
return type;
}
public URL getUrl() {
return url;
}
public boolean isAvailable() {
return true;
}
@Override
public void destroy() {
}
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
/**
* 执行调用
*
* @param proxy 代理的对象
* @param methodName 方法名
* @param parameterTypes 方法参数类型数组
* @param arguments 方法参数数组
* @return 调用结果
* @throws Throwable 发生异常
*/
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
@Override
public String toString() {
return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());
}
}
可以看到,
AbstractProxyInvoker
实现了Invoker 接口,并且是个抽象类,将doInvoke
方法交由我们自己来实现,当调用invoke
方法的时候,会创建RpcResult
对象,将doInvoke
返回值作为结果包装返回。
也就是说,在JdkProxyFactory
中,我们getInvoker
返回了一个实现了Invoker
接口的匿名类,并且实现了AbstractProxyInvoker
中的抽象方法。使得Service都转换成Invoker,它也就代表一个可执行体,可向它发起invoke调用。
4.4扩展类JavassistProxyFactory的getInvoker(T proxy, Class<T> type, URL url)
了解了JdkProxyFactory 是怎么工作的之后,再来看JavassistProxyFactory就会好理解很多
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
同样的,最后也是返回了一个
AbstractProxyInvoker
类并实现了他的抽象方法。与JDK不同的是,他仅是将调用请求转发给了Wrapper
类的invokeMethod
方法。
Wrapper
用于“包裹”目标类,Wrapper
是一个抽象类,仅可通过getWrapper(Class)
方法创建子类。在创建Wrapper
子类的过程中,子类代码生成逻辑会对getWrapper
方法传入的Class
对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成invokeMethod
方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再创建 Wrapper 实例。
也就是说,因为JDK动态代理的性能问题,Javassist
选用的是将service
类中的方法名、参数等一些信息通过字节码技术拼接起来形成一个新的类,这样就可以调用service类中各种各样的方法了。
他最后生成的代码样例如下:
public class Wrapper1 extends Wrapper {
public static String[] pns;
public static Map pts;
public static String[] mns; // all method name array.
public static String[] dmns;
public static Class[] mts0;
public String[] getPropertyNames() {
return pns;
}
public boolean hasProperty(String n) {
return pts.containsKey($1);
}
public Class getPropertyType(String n) {
return (Class) pts.get($1);
}
public String[] getMethodNames() {
return mns;
}
public String[] getDeclaredMethodNames() {
return dmns;
}
public void setPropertyValue(Object o, String n, Object v) {
dubbo.provider.hello.service.impl.HelloServiceImpl w;
try {
w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
}
public Object getPropertyValue(Object o, String n) {
dubbo.provider.hello.service.impl.HelloServiceImpl w;
try {
w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + $2 + "\" filed or setter method in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
}
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
dubbo.provider.hello.service.impl.HelloServiceImpl w;
try {
w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals($2) && $3.length == 0) {
w.sayHello();
return null;
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
}
}
生成完Wrapper以后,返回一个
AbstractProxyInvoker
实例。至此生成Invoker
的步骤就完成了。可以看到Invoker
执行方法的时候,会调用Wrapper
的invokeMethod
,这个方法中会有真实的实现类调用真实方法的代码。
5.导出服务到本地——protocol扩展点的实现
上面介绍了下protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local))
中生成invoker的过程,这次来介绍下protocol.export()
是怎么把invoker暴露到本地的。
实际上,Protocol 有两个 Wrapper 拓展实现类:
ProtocolFilterWrapper
、ProtocolListenerWrapper
。从上一篇是SPI机制我们知道,#export(...) 方法的调用顺序是:Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol
5.1ProtocolFilterWrapper
那我们就来看看
ProtocolFilterWrapper
和ProtocolListenerWrapper
做了什么。首先是ProtocolFilterWrapper ,实现了Protocol 接口,Protocol 的 Wrapper 拓展实现类,用于给 Invoker 增加过滤链。他的export方法如下
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 注册中心 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) >{ return protocol.export(invoker); } // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。 return protocol.export(buildInvokerChain(invoker, >Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
至于buildInvokerChain()就暂时不往下分析了,里面给我绕的有点晕,简单来说还是用到了ExtensionLoader#getActivateExtension(url, key, group) 方法(关联一下上一篇提到的@Activate),获得过滤器数组。然后串起来。
5.2ProtocolListenerWrapper#export(invoekr)
再来看看
ProtocolListenerWrapper
注意,上面代码的return protocol.export(……)
中的protocol
已经指的是ProtocolListenerWrapper
,buildInvokerChain()
才是ProtocolFilterWrapper
做的事情。
那么来看一下ProtocolListenerWrapper#export(invoker)
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 判断协议是不是RegistryProtocol。在远程暴露服务会有符合暴露该判断的情况,与本地暴露无关
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 暴露服务,创建 Exporter 对象,这里protocol是InjvmProtocol
Exporter<T> exporter = protocol.export(invoker);
// 获得 ExporterListener 数组
List<ExporterListener> listeners = Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY));
// 创建带 ExporterListener 的 Exporter 对象
return new ListenerExporterWrapper<T>(exporter, listeners);
}
ListenerExporterWrapper装饰invoker, 在构造器中遍历listeners构建referer的监听链
5.3InjvmProtocol.export(invoker)
这里我们调用了
InjvmProtocol.export(invoker)
来暴露服务,并且返回 创建带ExporterListener
的ListenerExporterWrapper
对象。
先来看看InjvmProtocol.export(invoker)
的方法。这里new了一个InjvmExporter
,注意这里传的exporterMap。这个exporterMap实际上就是com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
看看这构造方法干了什么
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
// 添加到 Exporter 集合
exporterMap.put(key, this);
}
这个exporterMap实际上就是
com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap
,是一个ConcurrentHashMap,相当于本机的缓存,,也就是本地暴露。
这个InjvmExporter类中持有指向这个ConcurrentHashMap的指针,每有一个InjvmExporter类就会将他自己放到这个ConcurrentHashMap中。
5.4ProtocolListenerWrapper的export方法的返回值
回到我们的ProtocolListenerWrapper
最后我们看一下return new ListenerExporterWrapper<T>(exporter, listeners)
是个什么东西
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
// 执行监听器
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
构造方法,循环 listeners ,执行
ExporterListener#exported(listener)
。若执行过程中发生异常 RuntimeException ,打印错误日志,继续执行,最终才抛出,也就是监听啦。
至此我们简单看完了本地暴露的调用链路,接下来我们看看远程暴露有什么区别。
6.远程暴露
先来看看远程暴露的时候的url样例长啥样,注意这url的开头是registry。
registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&pid=5840&qos.port=22222®istry=multicast×tamp=1594401963972
6.1远程暴露的doExportUrlsFor1Protocol方法代码逻辑
这里还是再贴一下远程暴露的时候的代码
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// 服务本地暴露
//省略代码………
// 服务远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
// "dynamic" :服务是否动态注册,如果设为false,注册后将显示后disable状态,需人工启用,并且服务提供者停止时,也不会自动取消册,需人工禁用。
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 获得监控中心 URL
URL monitorUrl = loadMonitor(registryURL); // TODO 芋艿,监控
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`
exporters.add(exporter);
}
} else { // 用于被服务消费者直连服务提供者,参见文档 http://dubbo.io/books/dubbo-user-book/demos/explicit-target.html 。主要用于开发测试环境使用。
// 使用 ProxyFactory 创建 Invoker 对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
// 创建 DelegateProviderMetaDataInvoker 对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 使用 Protocol 暴露 Invoker 对象
Exporter<?> exporter = protocol.export(wrapperInvoker);
// 添加到 `exporters`,
exporters.add(exporter);
}
}
}
最下面的else代码与前面的if里差别在于,当配置注册中心为 "N/A" 时,表示即使远程暴露服务,也不向注册中心注册。这种方式用于被服务消费者直连服务提供者。
在开发及测试环境下,经常需要绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直连方式,将以服务接口为单位,忽略注册中心的提供者列表,A 接口配置点对点,不影响 B 接口从注册中心获取列表。
我们着眼于if中的方法调用。
- 调用 [
URL#addParameterAndEncoded(key, value)
]方法,将服务体用这的 URL 作为"export"
参数添加到注册中心的 URL 中。通过这样的方式,注册中心的 URL 中,包含了服务提供者的配置。- 调用
ProxyFactory#getInvoker(proxy, type, url)
方法,创建 Invoker 对象。该 Invoker 对象,执行#invoke(invocation)
方法时,内部会调用 Service 对象(ref
)对应的调用方法。- 创建
com.alibaba.dubbo.config.invoker.DelegateProviderMetaDataInvoker
对象。该对象在 Invoker 对象的基础上,增加了当前服务提供者 ServiceConfig 对象。- 调用
Protocol#export(invoker)
方法,暴露服务。
这个方法着重注意一下,我们远程暴露的url开头是registry,由于方法上的@Adaptive注释,会自适应扩展成RegistryProtocol。而根据我们前面本地暴露的介绍,会有两个wrapper拓展实习类来包装。最后的调用顺序为:
Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper =>RegistrProtocol =>Protocol\$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
是不是很神奇,虽然传的是RegistrProtocol,但是RegistrProtocol里面又调用了DubboProtocol。来看下是怎么调用的吧。
6.2protocol扩展点包装类的简介
再简单过一遍那两个包装类,在上文本地暴露的时候已经讲过这两个类的用处。
ProtocolFilterWrapper 的export方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
ProtocolListenerWrapper 的export方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
注意这两个方法里都有if,判断为RegistrProtocol就直接放行了。经过了两个包装类,终于到了RegistrProtocol实体类,那么他是怎么调用DubboProtocol的呢?
6.3RegistrProtocol调用DubboProtocol的export()
image.png直接上RegistrProtocol的属性图,还记得SPI的自动注入吗,Protocol的默认注入就是DubboProtocol(当然如果url中有配置就是配置的Protocol啦),所以RegistrProtocol类中有DubboProtocol。我们先看一下
RegistrProtocol#export(Invoker)
做了什么
6.4RegistrProtocol#export(Invoker)
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 暴露服务
// export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获得注册中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
// 获得注册中心对象
// registry provider
final Registry registry = getRegistry(originInvoker);
// 获得服务提供者 URL
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
// 向注册中心订阅服务消费者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
// 向注册中心注册服务提供者(自己)
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // // 标记向本地注册表的注册服务提供者,已经注册
}
// 使用 OverrideListener 对象,订阅配置规则
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
// 创建订阅配置规则的 URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
// 创建 OverrideListener 对象,并添加到 `overrideListeners` 中
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心,发起订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
这里我们可以这么讲:
RegistryProtocol
可以认为并不是一个真正的协议,可以认为他是这些实际的协议(dubbo . rmi)包装者,这样客户端的请求在一开始如果没有服务端的信息,会先从注册中心拉取服务的注册信息,然后再和服务端直连.
每个方法都注释的差不多了,来看看最重要的#doLocalExport(invoker)
方法,暴露服务是怎么做的。
6.5RegistrProtocol的export调用了doLocalExport
/**
* 暴露服务。
*
* 此处的 Local 指的是,本地启动服务,但是不包括向注册中心注册服务的意思。
*
* @param originInvoker 原始的Invoker
* @param <T> 泛型
* @return Exporter 对象
*/
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 获得在 `bounds` 中的缓存 Key
String key = getCacheKey(originInvoker);
// 从 `bounds` 获得已经暴露过服务
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
// 未暴露过,进行暴露服务
if (exporter == null) {
// 创建 Invoker Delegate 对象, 可以理解为得到一个Invoker代理,里面包含原来的Invoker
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 暴露服务,创建 Exporter 对象
// 使用 创建的Exporter对象 + originInvoker ,创建 ExporterChangeableWrapper 对象
//注意这个protocol,这个就是注入进来的DubboProtocol
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
// 添加到 `bounds`,本地暴露
bounds.put(key, exporter);
}
}
}
return exporter;
}
doLocalExport()
方法,暴露服务。此处的 Local 指的是,先进行本地暴露,本地启动服务(即netty等),但是不包括向注册中心注册服务的意思。使用【创建的 Exporter 对象】+【originInvoker】,创建
ExporterChangeableWrapper
对象。这样,originInvoker 就和 Exporter 对象,形成了绑定的关系。
而ExporterChangeableWrapper
的主要作用则是进行unexport()时的一些清理工作
doLocalExport简单介绍完之后,那么接下来就是在上述代码
//注意这个protocol,这个就是注入进来的DubboProtocol exporter = new ExporterChangeableWrapper<T>((Exporter<T>) >protocol.export(invokerDelegete), originInvoker)
中调用的ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol这一链路了。
两个Wrapper都在本地暴露里讲过了,直接来看DubboProtocol
吧,
6.6RegistrProtocol的doLocalExport调用了DubboProtocol#export(invoker)
先来简单介绍下DubboProtocol的属性
image.png
然后来看看他的export()
是怎么做的,这次在注释中已经写的比较明了了。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//key由serviceName,port,version,group组成
//当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。
//dubbo.common.hello.service.HelloService:20880
String key = serviceKey(url);
//将Invoker转换成Exporter
//直接new一个新实例
//没做啥处理,将invoker,key,exporterMap聚合在一起
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//缓存要暴露的服务,key是上面生成的
exporterMap.put(key, exporter);
//是否支持本地存根
//远程服务后,客户端通常只剩下接口,而实现全在服务器端,
//但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存,
//提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub,
//客户端生成Proxy时,会把Proxy通过构造函数传给Stub,
//然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//根据URL绑定IP与端口,建立NIO框架的Server
openServer(url);
// 初始化序列化优化器
optimizeSerialization(url);
return exporter;
}
6.7DubboProtoco的export调用了openServer(url)
看看他的openServer(url)
是怎么做的
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
if (isServer) {
ExchangeServer server = serverMap.get(key);
//同一JVM中,同协议的服务,共享同一个Server,
//第一个暴露服务的时候创建server,
//以后相同协议的服务都使用同一个server
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//同协议的服务后来暴露服务的则使用第一次创建的同一Server
//server支持reset,配合override功能使用
//accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化
//这时需要重新设置Server
server.reset(url);
}
}
}
6.8如果server不存在,还需要createServer()
继续看createServer()
方法:
private ExchangeServer createServer(URL url) {
// 默认开启 server 关闭时发送 READ_ONLY 事件
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 默认开启 heartbeat
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 校验 Server 的 Dubbo SPI 拓展是否存在
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 设置编解码器为 Dubbo ,即 DubboCountCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 启动服务器
//Exchangers是门面类,里面封装的是Exchanger的逻辑。
//Exchanger默认只有一个实现HeaderExchanger.
//Exchanger负责数据交换和网络通信。
//从Protocol进入Exchanger,标志着程序进入了remote层。
//这里requestHandler是ExchangeHandlerAdapter
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 校验 Client 的 Dubbo SPI 拓展是否存在
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
- //TODO Exchangers.bind(url, requestHandler)方法留待下次再讲吧
至此,远程暴露的调用链路也基本看完啦。
7.名词解释
下面是我在博客看到的,觉得有助于理解,就搬了过来。
Invoker
可执行的对象,执行具体的远程调用,能够根据方法名称,参数得到相应的执行结果。
Invocation,包含了需要执行的方法,参数等信息。
有三种类型的Invoker:
- 本地执行类的Invoker。
- 远程通信执行类的Invoker。
- 多个远程通信执行类的Invoker聚合成集群版的Invoker。
以HelloService为例:
- 本地执行类的Invoker:在Server端有HelloServiceImpl实现,要执行该接口,只需要通过反射执行对应的实现类即可。
- 远程通信执行类的Invoker:在Client端要想执行该接口的实现方法,需要先进行远程通信,发送要执行的参数信息给Server端,Server端利用本地执行Invoker的方式执行,最后将结果发送给Client。
- 集群版的Invoker:Client端使用的时候,通过集群版的Invoker操作,Invoker会挑选一个远程通信类型的Invoker来执行。
提供者端的Invoker封装了服务实现类,URL,Type,状态都是只读并且线程安全。通过发起invoke来具体调用服务类。
ProxyFactory
在服务提供者端,ProxyFactory主要服务的实现统一包装成一个Invoker,Invoker通过Javassist或者JDK动态代理来执行具体的Service实现对象的方法。默认的实现是JavassistProxyFactory,代码如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
Protocol
服务地址的发布和订阅。
Protocol是dubbo中的服务域,只在服务启用时加载,无状态,线程安全,是实体域Invoker暴露和引用的主功能入口,负责Invoker的生命周期管理,是Dubbo中远程服务调用层。
Protocol根据指定协议对外公布服务,当客户端根据协议调用这个服务时,Protocol会将客户端传递过来的Invocation参数交给Invoker去执行。
Protocol加入了远程通信协议,会根据客户端的请求来获取参数Invocation。
@Extension("dubbo")
public interface Protocol {
int getDefaultPort();
//对于服务提供端,将本地执行类的Invoker通过协议暴漏给外部
//外部可以通过协议发送执行参数Invocation,然后交给本地Invoker来执行
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//这个是针对服务消费端的,服务消费者从注册中心获取服务提供者发布的服务信息
//通过服务信息得知服务提供者使用的协议,然后服务消费者仍然使用该协议构造一个Invoker。这个Invoker是远程通信类的Invoker。
//执行时,需要将执行信息通过指定协议发送给服务提供者,服务提供者接收到参数Invocation,然后交给服务提供者的本地Invoker来执行
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
关于RegistryProtocol和DubboProtocol的疑惑
以下是官方文档说明:
暴露服务:
(1) 只暴露服务端口:
在没有注册中心,直接暴露提供者的情况下,即:
<dubbo:service regisrty="N/A" /> or <dubbo:registry address="N/A" />ServiceConfig解析出的URL的格式为:
dubbo://service-host/com.foo.FooService?version=1.0.0
基于扩展点的Adaptive机制,通过URL的"dubbo://"协议头识别,直接调用DubboProtocol的export()方法,打开服务端口。
(2) 向注册中心暴露服务:
在有注册中心,需要注册提供者地址的情况下,即:
<dubbo:registry address="zookeeper://10.20.153.10:2181" />
ServiceConfig解析出的URL的格式为:
registry://registry-host/com.alibaba.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")
基于扩展点的Adaptive机制,通过URL的"registry://"协议头识别,就会调用RegistryProtocol的export()方法,将export参数中的提供者URL,先注册到注册中心,再重新传给Protocol扩展点进行暴露:
dubbo://service-host/com.foo.FooService?version=1.0.0
基于扩展点的Adaptive机制,通过提供者URL的"dubbo://"协议头识别,就会调用DubboProtocol的export()方法,打开服务端口。
RegistryProtocol,注册中心协议集成,装饰真正暴露引用服务的协议,增强注册发布功能。
ServiceConfig中的protocol是被多层装饰的Protocol,是DubboProtocol+RegistryProtocol+ProtocolListenerWrapper+ProtocolFilterWrapper。
ProtocolFilterWrapper负责初始化invoker所有的Filter。
ProtocolListenerWrapper负责初始化暴露或引用服务的监听器。
RegistryProtocol负责注册服务到注册中心和向注册中心订阅服务。
DubboProtocol负责服务的具体暴露与引用,也负责网络传输层,信息交换层的初始化,以及底层NIO框架的初始化。
Exporter
负责invoker的生命周期,包含一个Invoker对象,可以撤销服务。
Exchanger
负责数据交换和网络通信的组件。每个Invoker都维护了一个ExchangeClient的 引用,并通过它和远端server进行通信。
网友评论