在dubbo服务的启动过程中,看到,所有的dubbo自定义标签都会由DubboNamespaceHandler
处理, 遇到reference
标签,如何发现服务的?
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
在Spring容器启动过程中,遇到reference
标签会初始化一个ReferenceBean
,在初始化这个bean的过程中,获取到服务提供方提供的proxy服务
。
官方文档:
reference按照官方文档的说法,应该是由ReferenceConfig
,调用DubboProtol
获取DubboInvoker
,然后通过ProxyFactory
获取到下游ref对象的代理。
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
public Object getObject() throws Exception {
//返回的真实对象
return get();
}
public void afterPropertiesSet() throws Exception {
//省略一万行代码
Boolean b = isInit();
if (b != null && b.booleanValue()) {
getObject();
}
}
}
从ReferenceBean
对象定义实现了FactoryBean
接口能够看到,该对象初始化完成之后返回的是getObject()
方法的返回对象,实现了InitializingBean
接口,在该对象初始化完成之后调用afterPropertiesSet()
,最终都会调用ReferenceConfig
的get()
方法获取ref
对象。
// interface proxy reference
private transient volatile T ref;
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
//初始化
init();
}
return ref;
}
private void init() {
if (initialized) {
return;
}
initialized = true;
//拼接URLkey=value参数,保存在Map中
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prefix = StringUtils.getServiceKey(map);
//忽略很多代码
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
//重点在这里,创建一个Proxy,拿到ref对象。
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
在创建ref对象时,是通过注册中心配置,拼接URL,当注册中心只有一个服务时,直接去引用服务,当注册中心有多个服务时,则先将多个服务分别get到,然后对外统一组装成一个服务,然后通过代理工厂产生了一个代理对象。
//这里忽略很多代码 点对点,只分析dubbo协议,且只从注册中心订阅服务的场景
//这里加载配置中心
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
//registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=test-protocol-random-port&dubbo=2.0.0&pid=2264&refer=application%3Dtest-protocol-random-port%26dubbo%3D2.0.0%26injvm%3Dfalse%26interface%3Dcom.alibaba.dubbo.config.api.DemoService%26methods%3DsayName%2CgetUsers%2Cecho%2CthrowDemoException%2CgetBox%26pid%3D2264%26register.ip%3D192.168.5.5%26side%3Dconsumer%26timestamp%3D1520518336336®istry=multicast×tamp=1520518337358
//组装URL
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
//忽略若干行代码
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
return (T) proxyFactory.getProxy(invoker);
引用服务,实质是组装一个DubboInvoker
//refprotocol.refer(interfaceClass, url)源码
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
而将多个服务如何包装成一个服务?dubbo中有一层目录结构,directory,将多个invoker组装成一个FailoverClusterInvoker服务,实质就是一个invoker服务。
cluster.join(new StaticDirectory(invokers));
//cluster定义,用到spi
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
//FailoverClusterInvoker定义
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T>
//AbstractClusterInvoker定义
public abstract class AbstractClusterInvoker<T> implements Invoker<T>
代理对象如何产生?这里和服务发布一样,使用proxyFactory获取一个代理对象,实质返回的是 InvokerInvocationHandler
对象。
return (T) proxyFactory.getProxy(invoker);
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
这里有一个问题,当我们在调用的过程中,发现实际上调用的InvokerInvocationHandler
对象,并不是FailoverClusterInvoker
,而是MockClusterInvoker
,这是为什么呢?是wrapper扩展点加载机制导致的。后面再介绍。
fyi
网友评论