本文基于dubbo的默认配置来说明:注册中心:zk rpc协议:dubbo协议
基于前两篇文章的分析。comsumer在启动的时候需要根据dubbo的配置文件启动spring容器,并用自定义命名空间和自定义<dubbo::?>类型节点解析器,来完成对消费接口的注入。当读取到配置文件的reference节点时,会创建一个ReferenceBean的bean对象。ReferenceBean继承自spirng的FactoryBean接口。因此当我们调用以下程序时,返回的对象demoService
由ReferenceBean#getObject
来获得:
DemoService demoService = (DemoService) context.getBean("demoService");
核心代码在ReferenceConfig#createProxy
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
关键步骤就一处:invoker = refprotocol.refer(interfaceClass, url);
获取实际的消费者调用接口时的实际处理对象invoker
我们看下这个流程:ExtensionLoader
根据配置的url(registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?...),选取到了RegistryProtocol#refer
来生成调用链对象。定义了一个RegistryDirectory
对象directory
,将消费者注册到zk上(创建一个临时节点),同时订阅该接口对应的zk节点变化通知。生成一个携带directory
的FailoverClusterInvoker
类型对象返回。(注意:FailoverClusterInvoker对象只需要一个)。

provider
注册到zk时,携带了自身地址,接口信息,启动时间戳,pid,weight权重等信息。FailoverClusterInvoker
中的变量directory
在监听到provider的节点信息变化时,会将相应的url转化成invoker保存下来,url是如何转化成invoker的?核心实现位于RegistryDirectory#Map<String, Invoker<T>> toInvokers(List<URL> urls)
,provider采用dubbo协议提供服务的情况下,则根据url定位到协议处理时invoker是DubboProtocol#refer
生成的。根据拿到的url中的provider地址,用netty创建了一个client链接,放入DubboInvoker
。同时将新生成的DubboInvoker
放入FailoverClusterInvoker#directory
存起来。

当用户调用dubbo方法的时候,会从FailoverClusterInvoker#directory
中取到所有该reference接口的DubboInvoker
链接,用配置的负载均衡算法选取一条DubboInvoker
来执行invoke
方法,根据上述流程可知,每条DubboInvoker
携带着一个netty客户端,调用invoke时用客户端发起请求报文并同步等待结果,最终将结果返回给调用用户。

Protocol装饰器
,之前有说过,用ExtensionLoader
的,获取代理实例的时候,还会将对应接口的装饰器加到返回的实例上,refprotocol对象实际在返回时还包含了3层装饰器:QosProtocolWrapper,ProtocolFilterWrapper,ProtocolListenerWrapper。
1.QosProtocolWrapper
QosProtocolWrapper用作provider的服务控制。端口取的配置dubbo.application.qos.port
,默认22222。
QosProtocolWrapper实现了一个netty的telnet服务(用LineBasedFrameDecoder
解码)。
2.5版本的buddo提供的功能有两个:对服务接口或者下线(offline/online +interfaceName);当控制下线时,会删去zk上对应接口的临时节点。这样consumer就不会发现到这个接口了。
负载均衡策略
RandomLoadBalance
RandomLoadBalance
是默认的负载策略,随机从invokers中选择一条,同时支持invoker带有weight。
ConsistentHashLoadBalance
一致性hash算法,用"要消费的服务组+接口"做key,invoker选择器做value。对于每个invoker的地址做hash运算(ip+port+数字),生成160个虚拟节点,放入整个hash环中。这样当以相同的参数多次调用时,就可以打到相同的服务了(作用:相同的参数用同一个服务消费,如果provider做了redis缓存,LRU之类会更加节省调用时间)。对一个任意参数的hash值,都会落在这些分割好的节点中间,向后选一个最靠近的虚拟节点对应的provider来消费。为什么每个地址生成了这么多虚拟节点呢?因为一个节点的随机性太大了,在provider数量比较少的时候,会导致大部分数据都打到相同的provider上。
RoundRobinLoadBalance
轮询,内部维护一个自增id,取(id%总权重)的值来判断当次执行应该选取哪个provider。
LeastActiveLoadBalance
每次选取活跃次数最少的链路。效果和轮训差不多,略去。
接口超时
我们可以在consumer配置中配置timeout
(单位ms)参数来控制接口超时时间,具体逻辑:在consumer给服务发送报文时,会记录报文Id。启动一个线程定时轮训每个会话报文是否超时,超时的话直接调用DefaultFuture#received
给出一个超时Response。
失败策略
调用接口可能失败,可能由于链接还没有成功,也可能接口超时未返回,这种情况下会产生一个RpcException
,dubbo中有如下的策略可供选择来处理这些异常情况。
FailoverClusterInvoker
FailoverClusterInvoker
是默认的失败策略。如果调用失败了,会重新调用负载均衡算法尝试其他provider。可以在reference节点中配置retries
来控制重试次数。
FailfastClusterInvoker
失败一次直接返回
FailbackClusterInvoker
5s后再次尝试,尝试retries
次。
FailsafeClusterInvoker
调用一次,忽略错误
BroadcastClusterInvoker
广播调用,只有所有provider都成功了才算成功。
异步调用
异步调用的示例代码如下:
Future<String> f = RpcContext.getContext().asyncCall(new Callable<String>() {
@Override
public String call() {
return demoService.sayHello("123");
}
});
String result = f.get();
对象f是一个FutureAdapter
类型的Future对象,内部存储了一个DefaultFuture
对象,当调用provider接口返回数据时,用Hessian2
库根据接口的返回类型反序列化二进制数据,并触发条件变量。这样调用DefaultFuture#get
就可以返回结果了。序列化部分后续分析。
网友评论