模式 | 简述 | 说明 | 适用场景 |
---|---|---|---|
FailfastClusterInvoker | 快速失败 | 直接负载均衡器选择出Invoker进行调用,调用失败直接抛异常,只调用一次 | 适用于幂等操作 |
FailsafeClusterInvoker | 失败安全 | 和FailfastClusterInvoker差不多,只是失败只是记录日志 | 适用于写入审计日志等 |
FailbackClusterInvoker | 失败异步重试 | 失败则记录下来,然后适用线程池的定时器每个5s进行重试 | 适合执行消息通知 |
ForkingClusterInvoker | 并行调用 | 负载均衡选择出一个Invoker后,根据配置的线程个数开始多个线程进行调用,将调用结果放入到LinkedBlockingQueue队列中,然后通过Poll方法将结果取出进行返回,这里有个小细节,即多线程调用过程中,失败了并不是立马将异常放入到队列,而是等value >= selected.size()才进行放入,主要是为了保证取出结果时能优先取到正常结果 | 适用于对读性能较高的场景 |
BroadcastClusterInvoker | 广播调用 | 不需要负载均衡选出唯一的Invoker而是将所有Invoker都进行调用 | 通常用于通知所有提供者更新缓存或日志等本地资源信息 |
**FailoverCluster ** | 失败转移 | 同步的进行失败重试,即使用一个for循环进行调用,这里需要注意的是有可能Invoker列表重试过程中会发生变化,所以每次重试前都需要检查一下 | 默认的集群策略 |
源码分析
FailfastClusterInvoker
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//检查提供者列表是否发生变化
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 调用 Invoker
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
// 抛出异常
throw (RpcException) e;
}
// 抛出异常
throw new RpcException(..., "Failfast invoke providers ...");
}
}
}
比较简单,不再赘述
FailsafeClusterInvoker
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 进行远程调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 打印错误日志,但不抛出
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 返回空结果忽略错误
return new RpcResult();
}
}
}
比较简单,不再赘述
FailbackClusterInvoker
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final long RETRY_FAILED_PERIOD = 5 * 1000;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
new NamedInternalThreadFactory("failback-cluster-timer", true));
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 进行调用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 如果调用过程中发生异常,此时仅打印错误日志,不抛出异常
logger.error("Failback to invoke method ...");
// 记录调用信息
addFailed(invocation, this);
// 返回一个空结果给服务消费者
return new RpcResult();
}
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
// 创建定时任务,每隔5秒执行一次
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
// 对失败的调用进行重试
retryFailed();
} catch (Throwable t) {
// 如果发生异常,仅打印异常日志,不抛出
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
// 添加 invocation 和 invoker 到 failed 中
failed.put(invocation, router);
}
void retryFailed() {
if (failed.size() == 0) {
return;
}
// 遍历 failed,对失败的调用进行重试
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 再次进行调用
invoker.invoke(invocation);
// 调用成功后,从 failed 中移除 invoker
failed.remove(invocation);
} catch (Throwable e) {
// 仅打印异常,不抛出
logger.error("Failed retry to invoke method ...");
}
}
}
}
即失败时addFailed,然后定时器开始retryFailed,该方法内的逻辑及时遍历 failed,对失败的调用进行重试,调用成功后,从 failed 中移除 invoker
ForkingClusterInvoker
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
private final ExecutorService executor = Executors.newCachedThreadPool(
new NamedInternalThreadFactory("forking-cluster-timer", true));
@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
// 获取 forks 配置
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
// 获取超时配置
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 如果 forks 配置不合理,则直接将 invokers 赋值给 selected
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
// 循环选出 forks 个 Invoker,并添加到 selected 中
for (int i = 0; i < forks; i++) {
// 选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
selected.add(invoker);
}
}
}
// ----------------并行调用,并将结果放入阻塞队列------------------------- //
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
// 遍历 selected 列表
for (final Invoker<T> invoker : selected) {
// 为每个 Invoker 创建一个执行线程
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 进行远程调用
Result result = invoker.invoke(invocation);
// 将结果存到阻塞队列中
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
// 仅在 value 大于等于 selected.size() 时,才将异常对象
if (value >= selected.size()) {
// 将异常对象存入到阻塞队列中
ref.offer(e);
}
}
}
});
}
// -------------------从阻塞队列中取出结果------------------------ //
try {
// 从阻塞队列中取出远程调用结果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
// 如果结果类型为 Throwable,则抛出异常
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(..., "Failed to forking invoke provider ...");
}
// 返回结果
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider ...");
}
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
见注释,不再赘述
BroadcastClusterInvoker
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
// 遍历 Invoker 列表,逐个调用
for (Invoker<T> invoker : invokers) {
try {
// 进行远程调用
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
// exception 不为空,则抛出异常
if (exception != null) {
throw exception;
}
return result;
}
}
注意这里是不需要负载均衡组件,而是直接for循环调用所有Invoker即所有服务提供者 就好了,详细见注释,不再赘述
FailoverCluster
相对稍微复杂一点
首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
// 省略部分代码
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(..., "Failed to invoke the method ...");
}
}
一些问题
并行调用
会浪费资源,也会导致重复写(因为异步多线程去调用服务提供者)
快速失败
有可能由于网络超时的原因,导致服务提供者其实是执行成功了,只是没有给消费者响应,但消费者的调用策略是只调用一次,而且是快速失败的,这就可能导致不一致
失败同步重试
即FailOver
有可能服务提供者也是成功了,只是由于网络超时,而没有给响应,这时消费者快速切换至了其它提供者.所以有可能导致重复写.
这几个问题留在负载均衡选择那些看吧
网友评论