本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
文章内容顺序:
- 什么是集群容错
- 1.1 集群容错以及Cluster概念的介绍
- 1.2 集群容错的总体工作流程
- 1.3 Directory的简单介绍
- Cluster和ClusterInvoker的结构及代码分析
- 2.1 Cluster的UML图
- 2.2 以FailoverCluster为例的Cluster介绍
- ClusterInvoker的代码分析
- 3.1 ClusterInvoker的工作原理
- 3.2 AbstractClusterInvoker#invoke
- 3.3 FailoverClusterInvoker#doInvoke
- 3.4 AbstractClusterInvoker#select
- 3.5 AbstractClusterInvoker#doselect
- 3.6 FailbackClusterInvoker
- 3.7 FailfastClusterInvoker
- 3.8 FailsafeClusterInvoker
- 3.9 ForkingClusterInvoker
- 3.10 BroadcastClusterInvoker
1. 什么是集群容错
1.1 集群容错以及Cluster概念的介绍
为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。
这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。
为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。
这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。
image.png
这张图相信都看到过,这次要聊的就是Cluster层的这几个东西,也就是整套集群容错机制。
1.2 集群容错的总体工作流程
image.png
在这之前先说说Cluster的总体工作流程,
- 1.先生成Invoker对象,根据不同的Cluster实现生成不同类型的ClusterInvoker(这个就是服务引用阶段)
- 2.调用list从Directory中获取可用的服务列表(从这一步开始真正的调用流程),接着使用Router接口根据路由规则过滤一部分服务后最终返回服务列表
- 3.调用select做负载均衡,通过不同的负载均衡策略选出一个服务作为最后的调用
- 4.调用invoke做RPC调用,对于调用出现异常、成功、失败等情况,每种容错机制会有不同的处理方式。
1.3 Directory的简单介绍
书接上文服务引用,在
ReferenceConfig#createproxy
或者在RegistryProtocol#doRefer
中,会有Cluster 合并多个Invoker
的操作,如下,这些Invoker
是同一个接口在不同Provider
上的这个方法的实体,如我们上文提到的,服务通常是以集群的形式出现。下图中这个directory
可以看成是一个Invoker
的容器,在下面的Directory
章节中会细讲。image.png
2. Cluster结构及代码分析
2.1 Cluster的UML图
那么我们就来看看这个Cluster是何方神圣吧。

我们可以看到,每个 Cluster 实现类,对应一个
专属
于其的 Invoker 实现类。
有关Cluster的扩展包装类MockClusterWrapper
会在Dubbo集群容错——Mock中细讲,这边只说包装类里面的各个Cluster实现。
下面,我们一个一个子类往下看。
2.2 以FailoverCluster为例的Cluster介绍
FailoverCluster
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 创建并返回 FailoverClusterInvoker 对象
return new FailoverClusterInvoker<T>(directory);
}
}
其他的Cluster实现也是一样,join方法就直接返回了一个XXXClusterInvoker
所以接下来,我们把重点放在各种 Cluster Invoker 上
3. ClusterInvoker的代码分析
3.1 ClusterInvoker的工作原理
我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。前面说过,集群工作过程可分为两个阶段,
- 第一个阶段是在服务消费者初始化期间的join。
- 第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 >invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。也就是各种 Cluster Invoker的具体实现。
因此下面先来看一下 invoke 方法的逻辑。
3.2 AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
// 校验是否销毁
checkWhetherDestroyed();
// 获得所有服务提供者 Invoker 集合
List<Invoker<T>> invokers = list(invocation);
// 获得 LoadBalance 对象
LoadBalance loadbalance;
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 若是异步调用,设置调用编号
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行调用
return doInvoke(invocation, invokers, loadbalance);
}
// 抽象方法,由子类实现
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
AbstractClusterInvoker#invoke
方法主要用于列举Invoker
,以及加载LoadBalance
。最后再调用模板方法doInvoke
进行后续操作。
Invoker
列举方法list(Invocation)
的逻辑,主要就是调用Directory#list
方法,留到Directory
篇再做介绍。
接下来,我们把目光转移到 AbstractClusterInvoker 的各种实现类上,来看一下这些实现类是如何实现 doInvoke 方法逻辑的。
3.3 FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
// 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(copyinvokers, invocation);
// 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 保存最后一次调用的异常
// retry loop.
RpcException le = null; // last exception.
// 保存已经调用过的Invoker
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// failover机制核心实现:如果出现调用失败,那么重试其他服务器
for (int i = 0; i < len; i++) {
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
if (i > 0) {
checkWhetherDestroyed();
// 根据Invocation调用信息从Directory中获取所有可用Invoker
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 根据负载均衡机制从copyinvokers中选择一个Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 保存每次调用的Invoker
invoked.add(invoker);
// 设置已经调用的 Invoker 集合到RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// RPC 调用得到 Result
Result result = invoker.invoke(invocation);
// 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 如果是业务性质的异常,不再重试,直接抛出
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他性质的异常统一封装成RpcException
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息
throw new RpcException(le.getCode(), "Failed to invoke the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers "
+ providers + " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}
FailoverClusterInvoker#doInvoke
方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。
在 for 循环内,首先是通过负载均衡组件选择一个Invoker
,然后再通过这个Invoker
的invoke
方法进行远程调用。
如果失败了,记录下异常,并进行重试。重试时会再次调用AbstractClusterInvoker#list
方法列举 Invoker。(这个方法会调用到Directory#list
)
注意上面代码中有调用一个select
方法来通过负载均衡选择Invoker
这个select
是写在父类AbstractClusterInvoker
里的,来看看他是怎么负载均衡的。
3.4 AbstractClusterInvoker#select
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty()) {
return null;
}
// 获得 sticky 配置项,方法级
String methodName = invocation == null ? "" : invocation.getMethodName();
// 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
// 调用同一个服务提供者,除非该提供者挂了再进行切换
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
// 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
// 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
// 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 不包含
// stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
// 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
// availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的
// isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
// 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
// 此时继续调用 doSelect 选择 Invoker
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
// 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
粘滞连接的概念:粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。
粘滞连接将自动开启延迟连接,以减少长连接数。
- select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测
invokers
列表中是否包含stickyInvoker
,如果不包含,则认为该stickyInvoker
不可用,此时将其置空。- 这里的
invokers
列表可以看做是存活着的服务提供者列表,如果这个列表不包含stickyInvoker
,那自然而然的认为stickyInvoker
挂了,所以置空。- 如果
stickyInvoker
存在于invokers
列表中,此时要进行下一项检测 — 检测selected
中是否包含stickyInvoker
。如果包含的话,说明stickyInvoker
在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该stickyInvoker
。- 如果
selected
不包含stickyInvoker
,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。- 当可用性检测通过,才可返回
stickyInvoker
,否则调用doSelect
方法选择Invoker
。如果sticky
为true
,此时会将doSelect
方法选出的 Invoker 赋值给stickyInvoker
。
来看一下他doSelect 选择 Invoker是怎么做的
3.5 AbstractClusterInvoker#doselect
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.isEmpty()) {
return null;
}
// 【第一种】如果只有一个 Invoker ,直接选择
if (invokers.size() == 1) {
return invokers.get(0);
}
// 【第二种】如果只有两个 Invoker ,退化成轮循
// If we only have two invokers, use round-robin instead.
if (invokers.size() == 2 && selected != null && !selected.isEmpty()) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// 【第三种】使用 Loadbalance ,选择一个 Invoker 对象。
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
// 如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
//【第四种】重选一个 Invoker 对象
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
// Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
// 【第五种】看下第一次选的位置,如果不是最后,选+1位置.
int index = invokers.indexOf(invoker);
try {
// Avoid collision
// 最后在避免碰撞
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到 reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。
第一种:如果只有一个候选的 Invoker 对象,直接选择返回
第二种:如果只有两个候选的 Invoker 集合,退化为轮询。此处存在一个 BUG :
转载自我飞哥,《dubbo 源码 - 负载均衡》这里退化成轮询的实现有问题,对应源码
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
如果retries=4,即最多调用5次,且两个可选invoke分别为:10.0.0.1:20884,10.0.0.1:20886;
那么5次选择的invoke为:
- 10.0.0.1:20884
- 10.0.0.1:20886
- 10.0.0.1:20886
- 10.0.0.1:20886
- 10.0.0.1:20886,
即除了第1次外后面的选择都是选择第二个invoker;
因次需要把
selected.get(0)
修改为:selected.get(selected.size()-1)
;即每次拿前一次选择的
invoker
与invokers.get(0)
比较,如果相同,则选则另一个invoker
;否则就选invokers.get(0)
;第三种:调用
Loadbalance#select(invokers, url, invocation)
方法,使用 Loadbalance ,选择一个 Invoker 对象。
第四种:调用#reselect(loadbalance, invocation, invokers, selected, availablecheck)
方法,重新选择一个 Invoker 对象。
第五种:顺序从候选的 invokers 集合中,选择一个 Invoker 对象,不考虑是否可用,或者已经选择过
这个loadbalance
对象,从AbstractClusterInvoker#invoke
通过SPI加载以来,在各个方法里传了这么久,终于在AbstractClusterInvoker#doselect
里调用了,我们会在Dubbo集群容错——LoadBalance中详细分析
FailoverClusterInvoker
就此也讲的差不多了,接下来看看其他的ClusterInvoker
3.6 FailbackClusterInvoker
ublic class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final long RETRY_FAILED_PERIOD = 5 * 1000;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 检查 invokers 即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(invokers, invocation);
// 根据负载均衡机制从 invokers 中选择一个Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// RPC 调用得到 Result
return invoker.invoke(invocation);
} catch (Throwable e) {
// 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常
logger.error("Failback to invoke method ...");
// 记录调用信息
addFailed(invocation, this);
// 返回一个空结果给服务消费者
return new RpcResult();
}
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
// 创建定时任务,每隔5秒执行一次
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 对失败的调用进行重试
retryFailed();
} catch (Throwable t) {
// 如果发生异常,仅打印异常日志,不抛出
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
// 添加 invocation 和 invoker 到 failed 中
failed.put(invocation, router);
}
void retryFailed() {
if (failed.size() == 0) {
return;
}
// 遍历 failed,对失败的调用进行重试
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 再次进行调用
invoker.invoke(invocation);
// 调用成功后,从 failed 中移除 invoker
failed.remove(invocation);
} catch (Throwable e) {
// 仅打印异常,不抛出
logger.error("Failed retry to invoke method ...");
}
}
}
}
这个类主要由3个方法组成,首先是
doInvoker
,该方法负责初次的远程调用。若远程调用失败,则通过addFailed
方法将调用信息存入到failed
中,等待定时重试。
addFailed 在开始阶段会根据retryFuture
为空与否,来决定是否开启定时任务。
retryFailed
方法则是包含了失败重试的逻辑,该方法会对failed
进行遍历,然后依次对Invoker
进行调用。调用成功则将Invoker
从failed
中移除,调用失败则忽略失败原因。
3.7 FailfastClusterInvoker
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
// 检查 invokers 即可用Invoker集合是否为空,如果为空,那么抛出异常
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// RPC 调用得到 Result
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
// 若是业务性质的异常,直接抛出
throw (RpcException) e;
}
// 封装 RpcException 异常,并抛出
throw new RpcException(..., "Failfast invoke providers ...");
}
}
}
FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。
3.8 FailsafeClusterInvoker
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 检查 invokers 即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(invokers, invocation);
// 根据负载均衡机制从 invokers 中选择一个Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 进行远程调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 打印错误日志,但不抛出
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 返回空结果忽略错误
return new RpcResult();
}
}
}
FailsafeClusterInvoker
是一种失败安全的Cluster Invoker
。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker
仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。
3.9 ForkingClusterInvoker
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 检查 invokers 即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(invokers, invocation);
// 保存选择的 Invoker 集合
final List<Invoker<T>> selected;
// 得到最大并行数,默认为 Constants.DEFAULT_FORKS = 2
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
// 获得调用超时时间,默认为 DEFAULT_TIMEOUT = 1000 毫秒
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 若最大并行书小于等于 0,或者大于 invokers 的数量,直接使用 invokers
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
// 循环,根据负载均衡机制从 invokers,中选择一个个Invoker ,从而组成 Invoker 集合。
// 注意,因为增加了排重逻辑,所以不能保证获得的 Invoker 集合的大小,小于最大并行数
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// 在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) { //Avoid add the same invoker several times. //防止重复添加invoker
selected.add(invoker);
}
}
}
// 设置已经调用的 Invoker 集合,到 Context 中
RpcContext.getContext().setInvokers((List) selected);
// 异常计数器
final AtomicInteger count = new AtomicInteger();
// 创建阻塞队列
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
// 循环 selected 集合,提交线程池,发起 RPC 调用
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
// RPC 调用,获得 Result 结果
Result result = invoker.invoke(invocation);
// 添加 Result 到 `ref` 阻塞队列
ref.offer(result);
} catch (Throwable e) {
// 异常计数器 + 1
int value = count.incrementAndGet();
// 若 RPC 调结果都是异常,则添加异常到 `ref` 阻塞队列
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
// 从 `ref` 队列中,阻塞等待结果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
// 若是异常结果,抛出 RpcException 异常
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
// 若是正常结果,直接返回
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
}
}
ForkingClusterInvoker
会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke
方法就会立即结束运行。ForkingClusterInvoker
的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。
注意这一段代码
catch (Throwable e) {
// 异常计数器 + 1
int value = count.incrementAndGet();
// 若 RPC 调结果都是异常,则添加异常到 `ref` 阻塞队列
if (value >= selected.size()) {
ref.offer(e);
}
}
在并行调用多个服务提供者的情况下,只要有一个服务提供者能够成功返回结果,而其他全部失败。此时
ForkingClusterInvoker
仍应该返回成功的结果,而非抛出异常。在value >= selected.size()
时将异常对象放入阻塞队列中,可以保证异常对象不会出现在正常结果的前面,这样可从阻塞队列中优先取出正常的结果。
3.10 BroadcastClusterInvoker
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 检查 invokers 即可用Invoker集合是否为空,如果为空,那么抛出异常
checkInvokers(invokers, invocation);
// 设置已经调用的 Invoker 集合,到 Context 中
RpcContext.getContext().setInvokers((List) invokers);
// 保存最后一次调用的异常
RpcException exception = null;
// 保存最后一次调用的结果
Result result = null;
// 循环候选的 Invoker 集合,调用所有 Invoker 对象。
for (Invoker<T> invoker : invokers) {
try {
// 发起 RPC 调用
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e); // 封装成 RpcException 异常
logger.warn(e.getMessage(), e);
}
}
// 若存在一个异常,抛出该异常
if (exception != null) {
throw exception;
}
return result;
}
}
BroadcastClusterInvoker
会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker
会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。
网友评论