源码分析基于dubbo 2.6.0
ReferenceBean继承了ReferenceConfig, 还实现了FactoryBean。
spring启动时,会通过FactoryBean.getObject创建bean。这里会调用到ReferenceConfig.createProxy
private T createProxy(Map<String, String> map) {
...
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
} else {
// 构建url,放入到urls中
...
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
return (T) proxyFactory.getProxy(invoker);
}
refprotocol.refer会调用到RegistryProtocol.refer。它会调用doRefer方法。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
...
// 订阅
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;
}
这里只关注cluster.join
,因为它创建了invoker。
cluster默认为FailoverCluster
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
但FailoverCluster处理前,要经过装饰者MockClusterWrapper
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
前面dubbo 注册说过了,订阅会调用到RegistryDirectory.notify,进而调用refreshInvoker方法。
private void refreshInvoker(List<URL> invokerUrls) {
...
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
}
看看toInvokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
for (URL providerUrl : urls) {
...
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
;
return newUrlInvokerMap;
}
protocol.refer会调用DubboProtocol.refer,进而启动client端的网络通讯服务(如netty)。
proxyFactory.getProxy
默认通过JavassistProxyFactory实现。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
跟server端处理请求一样,这里也有invoker责任链,节点为InvokerInvocationHandler > MockClusterInvoker > FailoverClusterInvoker
动态类Proxy由com.alibaba.dubbo.common.bytecode.getProxy(ClassLoader cl, Class<?>... ics)
方法生成。跟com.alibaba.dubbo.common.bytecode.Wrapper
一样,拼凑代码字符串,通过Javassist生成动态类。
这里直接看生成动态类,生成动态类的方法体为
Object[] args = new Object[1];
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
就是通过handler来调用业务方法。
先看看InvokerInvocationHandler
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
...
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
Invocation是rpc调用上下文信息,包括methodName,arguments等信息。
invoker的请求调用最后会调用FailoverClusterInvoker,它会调用父类AbstractClusterInvoker
public Result invoke(final Invocation invocation) throws RpcException {
...
LoadBalance loadbalance;
// 路由
List<Invoker<T>> invokers = list(invocation);
// 负载均衡
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 调用请求
return doInvoke(invocation, invokers, loadbalance);
}
list方法会调用AbstractDirectory.list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
RegistryDirectory.doList方法通过Invocation中的methodName,Arguments等信息从methodInvokerMap中获取对应的invokers。(这里的invokers是用于发送网络请求到server进行逻辑处理,而上面说的invoker责任链是用于实现MockCluster/FailoverCluster等扩展功能,注意两者使用场合不同)
Router接口负责实现路由选择操作,Router共有三个实现MockInvokersSelector/ConditionRouter/ScriptRouter。
MockInvokersSelector是专用于处理MOCK请求的。
LoadBalance接口负责实现负载均衡,有RandomLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance/LeastActiveLoadBalance。
AbstractClusterInvoker有FailoverClusterInvoker/FailfastClusterInvoker/FailbackClusterInvoker/AvailableClusterInvoker/...
不同的实现类支持不同的集群容错。
看看默认的FailoverClusterInvoker.doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
// 已调用的invoked
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
...
// 选择invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用invoker,发送网络请求
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(...);
}
注意,如果是server端抛出的业务异常,不会重发请求,只有rpc异常(如连接超时),才会重发请求。
网友评论