Dubbo集群容错策略
Dubbo支持的集群容错策略包括:
- Available Cluster,可用调用
- Broadcast Cluster,广播调用
- Failback Cluster,失败自动恢复
- Failfast Cluster,快速失败
- Failover Cluster,失败重试
- Failsafe Cluster,安全失败
- Forking Cluster,并行调用
- Mergeable Cluster,聚合调用
可用调用,遍历服务提供方获取第一个可用的服务来进行调用,其核心代码如下所示,遍历所有的invokers,找到第一个可用的invoker直接返回
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
不同注册中心的isAvailable()方法实现不一样,以zookeeper和redis注册中心为例,
//ZookeeperRegistry
public boolean isAvailable() {
//判断是否已连接
return zkClient.isConnected();
}
//RedisRegistry
public boolean isAvailable() {
for (JedisPool jedisPool : jedisPools.values()) {
try {
//判断是否有可用的redis连接
Jedis jedis = jedisPool.getResource();
try {
if (jedis.isConnected()) {
return true;
}
} finally {
jedisPool.returnResource(jedis);
}
} catch (Throwable t) {
}
}
return false;
}
不同注册中心的实现思路差不多,都是需要获取可用的连接。
广播调用,当消费者调用一个远程接口时,Dubbo Client会依次调用该接口的所有服务提供者,任意一个服务调用失败则标志服务调用失败,常用于通知所有提供者更新缓存等本地信息。循环调用所有的invokers,如果有一个调用失败,那么整个调用抛出异常,标志该调用失败。
//定义Rpc调用的异常
RpcException exception = null;
Result result = null;
//遍历所有invokers
for (Invoker<T> invoker : invokers) {
try {
//依次调用
result = invoker.invoke(invocation);
} catch (RpcException e) {
//捕获异常
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
//存在异常直接抛出
if (exception != null) {
throw exception;
}
失败自动恢复,当服务端调用失败后,会在后台记录请求失败的请求,并按照一定的策略后期进行重试,这种模式常用于消息通知等操作。这种策略会对失败的调用做后期补偿,将调用失败的记录保存起来,后续根据一定的策略进行重试,其核心代码如下所示,
Invoker<T> invoker = null;
try {
//检查invokers,主要进行null判断
checkInvokers(invokers, invocation);
//根据负载均衡选择一个invoker
invoker = select(loadbalance, invocation, invokers, null);
//进行远程调用
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
//如果调用失败,存在异常,将本次调用保存起来
addFailed(loadbalance, invocation, invokers, invoker);
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
快速失败,服务调用失败后,立即报错,这种模式用于非幂等的写操作。当远程调用出现异常,直接将异常抛出,其核心代码如下所示
//检查invokers,主要进行null判断
checkInvokers(invokers, invocation);
//根据负载均衡选择一个invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
//捕获异常抛出,不做人任何处理
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
当消费方调用服务失败后,会自动切换去请求其他服务提供者,可以设置重试次数,通常用于读操作或者幂等的写操作,其核心源码如下所示,其核心步骤均在源码基础上进行了相应的注释。
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//获取重试次数,重试次数+1就是最多的调用次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
//上一次调用失败的异常
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
Set<String> providers = new HashSet<String>(len);
//for循环重试次数+1次,只要调用调用成功直接返回
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
//获取当前可用的invokers
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
//根据负载均衡选择一个invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
//将选择的invoker放入invoked列表,后续负载均衡选择invoker时会避免选择已经选择过得invoker
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
//记录最后一次调用失败的异常,如果最后调用重试次数+1次后仍然失败,则抛出最后一次失败调用的异常
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
安全失败,当调用远程服务出现异常时,直接忽略异常,通常用于无关紧要的业务,如日志记录。其核心代码如下所示,
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
//调用失败,忽略异常直接正常返回
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation);
}
并行调用,当消费方进行服务调用时,会并行调用多个服务提供者,只要有一个服务提供者返回即可,通常用于对实时性要求比较高的读操作,虽然提高了响应时间,但是浪费了更多的服务器资源,典型的空间换时间的操作。其核心代码如下所示,首先获取并行数和调用超时时间,通过负载均衡选择被调用的invoker列表,将这些invoker的调用放入线程池中处理,并将调用结果放入阻塞队列中,从阻塞队列中获取返回结果直至超时时间,返回异常信息获取成功调用的结果。
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
//获取并行数量,同时调用多少个invoker
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
//支持超时时间
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
//通过负载均衡选取invokers
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>(forks);
while (selected.size() < forks) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
//存储调用结果的阻塞队列
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
//将所有选择的invoker放入线程池中,并将调用结果存储在ref中
for (final Invoker<T> invoker : selected) {
executor.execute(() -> {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
try {
//从ref中不断尝试获取调用结果直至超时时间timeout
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
//如果调用异常直接抛出
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
//成功调用返回调用结果
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
聚合调用,合并结果集,通常和group一起使用。
实现自定义的集群容错策略
自定义的集群容错策略,如Dubbo提供的Forking Cluster并行调用策略,该策略并行的调用若干个服务提供者,返回第一个完成调用的结果,但是并没有关心调用是否成功,这里对这种策略进行一下优化,要求超时时间内忽略失败的调用结果,返回第一个成功调用的结果,如果都调用失败或者超时则抛出异常。
要实现上述功能,首先需要对Dubbo中的Cluster架构了解。
Dubbo中Cluster被定义为一个扩展点,默认为FailoverCluster,说明我们能够实现该扩展点,自定义Cluster的逻辑。
@SPI(FailoverCluster.NAME)
public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
Dubbo源码中通过org.apache.dubbo.rpc.cluster.Cluster文件指定了10中集群容错的策略,大多数上文中均有介绍。
org.apache.dubbo.rpc.cluster.Cluster文件
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
以FailoverCluster为例,其源码如下所示
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}
}
其类结构为
FailoverCluster继承自AbstractCluster,AbstractCluster实现了Cluster接口,实现了一些基本功能,FailoverCluster的doJoin方法返回一个FailoverClusterInvoker,FailoverClusterInvoker类的继承关系如下所示
FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker实现了Invoker接口。
当我们需要实现自定义的CustomizedCluster时,同样是要继承AbstractCluster类,
public class CustomizedCluster extends AbstractCluster {
public final static String NAME = "failover";
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new CustomizedClusterInvoker<>(directory);
}
}
然后编写CustomizedClusterInvoker类
public class CustomizedClusterInvoker<T> extends AbstractClusterInvoker<T> {
/**
* Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
* which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
*/
private final ExecutorService executor = Executors.newCachedThreadPool(
new NamedInternalThreadFactory("forking-cluster-timer", true));
public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>(forks);
while (selected.size() < forks) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
executor.execute(() -> {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
});
}
RpcException le = null;
try {
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
while(ret != null) {
//忽略异常返回
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
le = e;
ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
continue;
}
}
return (Result) ret;
} catch (InterruptedException e) {
if(le == null) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
throw le;
}
} finally {
// clear attachments which is binding to current thread.
RpcContext.getContext().clearAttachments();
}
}
}
总结
本文主要介绍了Dubbo中的集群容错机制,Dubbo中提供了十来种容错机制,可以通过不同的业务逻辑选取合适的容错机制,集群对象Cluster是一个扩展点,我们可以实现该扩展点实现了定制化的容错逻辑,本文针对Dubbo提供的ForkingCluster进行了优化。
写在最后
源码路上,共勉!!!
网友评论