先上dubbo初始化调用栈:
dubbo初始化1.png
一、消费者bean初始化1----spring入口
这节主要介绍dubbo的消费者和spring结合时,初始化的过程。
这里默认使用@Reference注解注入dubbo的bean。
可以看到,Spring bean在初始化的时候,需要在populateBean方法进行注入属性,这里注入就是dubbo bean属性,这里都是由spring管理的。但是在factory.getObject()这里,factory就是dubbo的ReferenceBean,它重写了getObject方法。
二、消费者bean初始化2----RegistryProtocol执行
这里只介绍init()方法中的 ref = createProxy(map)方法。进去之后,核心方法为invoker = refprotocol.refer(interfaceClass, urls.get(0))。重点来了refprotocol是什么?如下:
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
发现refprotocol是dubbo SPI的一个自适应拓展类。该类是由Javassist生成的代理类。在调用refer方法的时候,会去调用ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(“key”),然后这个时候的key一定是registry,所以会走到RegistryProtocol.refer。但是dubbo SPI机制,会自动把wapper封装上去。如下:
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
详细见dubbo的SPI详解,可知RegistryProtocol会被两个默认的wapper封装:ProtocolFilterWrapper和ProtocolListenerWrapper(多个wapper之间的顺序,目前好像没有设置,直接从Set取封装的)。这个是dubbo的spi,对protocol接口默认配置。所以下一步一定会调用到ProtocolListenerWrapper上,代码如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
该接口对registry特殊处理,不进行ListenerInvokerWrapper封装,直接调用下一层的protocol,下一层就是ProtocolFilterWrapper,代码如下:
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url); // 服务消费者注册并引用服务
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
Constants.CONSUMER); // 消费者引用服务的拦截器
}
该接口也对registry特殊处理,不进行buildInvokerChain封装,直接调用下一层的protocol,下一层就是真正的RegistryProtocol了,代码如下:
// 引用远程服务
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY))
.removeParameter(Constants.REGISTRY_KEY);
// 基于注册中心构建调用执行体的本地动态代理
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// refer=application%3Dxxx%26interface%3Dcom.xxx.XxxService%26methods%3Dxxx%26register.ip%3Dxxx%26remote.timestamp%3Dxxx
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
到这里只是执行了RegistryProtocol的refer方法,并没有初始化真正的Invoker。看下最上面的一行代码,url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)),这里把url的protocol参数从registry替换成了dubbo。为下面的正在初始化做准备,下面会讲到。
三、消费者bean初始化3----directory.subscribe()
上面介绍了RegistryProtocol的执行,真正初始化Invoker的地方在,doRefer方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 注册目录
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// REFER_KEY的所有属性
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY),
0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 将服务消费者订阅配置注册到服务注册中心
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY
+ "," + Constants.CELLS_CATEGORY));
Invoker<T> invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这里的核心方法,directory.subscribe就是订阅zk,并且初始化Invoker。一直跟踪到如下图所示:
dubbo3.png
这里可以看到toInvokers方法,其中初始化Invoker的代码如下:
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
到这里我们终于又看到了protocol.refer方法了。这里的protocol还是SPI自适应拓展实现类,正在执行的protocol需要根据url上的参数决定,这时的protocol=dubbo。但是由于Wapper类会自动包装在DubboProtocol上,所以这里还是先执行了ProtocolListenerWrapper了,然后执行ProtocolFilterWrapper。我们再把他们的代码看一遍:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url); // 服务消费者注册并引用服务
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
Constants.CONSUMER); // 消费者引用服务的拦截器
}
这个时候是dubbo协议了,不会被register协议拦截了。所以,会走到各自方法下面的逻辑。
ProtocolListenerWrapper会用ListenerInvokerWrapper包装下面的Invoker。下面的Invoker会通过buildInvokerChain构造一个filterChain。看下buildInvokerChain源码:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 拦截器列表
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class)
.getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i--) {
//从最后一个filter开始封装,它会被封装到最底层。
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 进行调用拦截
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
通过SPI的getActivateExtension方法,获取dubbo默认激活的filter。然后一层一层filter封装。可以看到,封装的时候从列表的最后一个filter开始封装,封装到最底层,最接近DubboInvoker。DubboInvoker是真正的一个执行调用的地方,它里面应该包含了一个网络client,用来把dubbo调用请求,通过网络的方式发送到提供者那里。
四、消费者bean初始化4----DubboInvoker的初始化
DubboInvoker的初始化,主要是用来初始化ExchangeClient的。代码如下:
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
public DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers) {
super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
this.clients = clients;
// get version.
this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0");
this.invokers = invokers;
}
核心方法在getClients方法上,如下:
private ExchangeClient[] getClients(URL url) {
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
核心方法是initClient(url),这里就不向下展开了。总的来说,下面的代码就是通过url上的参数值,根据SPI获取相应的Exchanger,Transporter,然后初始化NettyClient。初始化好后,基本上tcp连接就建立了。到这里一个真正的Invoker,它包含了各种封装的Invoker。
下面简单看下一个dubbo消费者bean的调用链路:
image.png
可以看到从getByCouponCode的业务方法,到调用到真正Invoker的DubboInvoker,中间经过了很多封装类型的Invoker操作。
所以说,Invoker是dubbo核心领域模型。一切都是围绕着Invoker来的,这也符合dubbo的设计原则:微核心+插件式。
- 微核心:Invoker,Exporter,Invocation,这些都是dubbo抽象出来的核心领域模型,一切高级特性都是围绕着它们来。比如:负载均衡,过滤器等特性
- 插件式:通过SPI方法,提供插件式的拓展性。比如:protocol,serialize等协议,都是可替换的。
肯定有错误的地方,欢迎指正和讨论
网友评论