美文网首页
Dubbo集群容错策略简介

Dubbo集群容错策略简介

作者: 花醉霜寒 | 来源:发表于2020-07-07 11:36 被阅读0次

Dubbo集群容错策略

Dubbo支持的集群容错策略包括:

  • Available Cluster,可用调用
  • Broadcast Cluster,广播调用
  • Failback Cluster,失败自动恢复
  • Failfast Cluster,快速失败
  • Failover Cluster,失败重试
  • Failsafe Cluster,安全失败
  • Forking Cluster,并行调用
  • Mergeable Cluster,聚合调用

\color{green}{Available Cluster}

可用调用,遍历服务提供方获取第一个可用的服务来进行调用,其核心代码如下所示,遍历所有的invokers,找到第一个可用的invoker直接返回

    for (Invoker<T> invoker : invokers) {
        if (invoker.isAvailable()) {
            return invoker.invoke(invocation);
        }
    }

不同注册中心的isAvailable()方法实现不一样,以zookeeper和redis注册中心为例,

    //ZookeeperRegistry
    public boolean isAvailable() {
        //判断是否已连接
        return zkClient.isConnected();
    }

    //RedisRegistry
    public boolean isAvailable() {
        for (JedisPool jedisPool : jedisPools.values()) {
            try {
                //判断是否有可用的redis连接
                Jedis jedis = jedisPool.getResource();
                try {
                    if (jedis.isConnected()) {
                        return true; 
                    }
                } finally {
                    jedisPool.returnResource(jedis);
                }
            } catch (Throwable t) {
            }
        }
        return false;
    }

不同注册中心的实现思路差不多,都是需要获取可用的连接。

\color{green}{Broadcast Cluster}

广播调用,当消费者调用一个远程接口时,Dubbo Client会依次调用该接口的所有服务提供者,任意一个服务调用失败则标志服务调用失败,常用于通知所有提供者更新缓存等本地信息。循环调用所有的invokers,如果有一个调用失败,那么整个调用抛出异常,标志该调用失败。

    //定义Rpc调用的异常
    RpcException exception = null;
    Result result = null;
    //遍历所有invokers
    for (Invoker<T> invoker : invokers) {
        try {
            //依次调用
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            //捕获异常
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    //存在异常直接抛出
    if (exception != null) {
        throw exception;
    }

\color{green}{Failback Cluster}

失败自动恢复,当服务端调用失败后,会在后台记录请求失败的请求,并按照一定的策略后期进行重试,这种模式常用于消息通知等操作。这种策略会对失败的调用做后期补偿,将调用失败的记录保存起来,后续根据一定的策略进行重试,其核心代码如下所示,

    Invoker<T> invoker = null;
    try {
        //检查invokers,主要进行null判断
        checkInvokers(invokers, invocation);
        //根据负载均衡选择一个invoker
        invoker = select(loadbalance, invocation, invokers, null);
        //进行远程调用
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
        //如果调用失败,存在异常,将本次调用保存起来
        addFailed(loadbalance, invocation, invokers, invoker);
        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
    }

\color{green}{Failfast Cluster}

快速失败,服务调用失败后,立即报错,这种模式用于非幂等的写操作。当远程调用出现异常,直接将异常抛出,其核心代码如下所示

    //检查invokers,主要进行null判断
    checkInvokers(invokers, invocation);
    //根据负载均衡选择一个invoker
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    //捕获异常抛出,不做人任何处理
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                        + " select from all providers " + invokers + " for service " + getInterface().getName()
                        + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion()
                        + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                e.getCause() != null ? e.getCause() : e);
    }

\color{green}{Failover Cluster}

当消费方调用服务失败后,会自动切换去请求其他服务提供者,可以设置重试次数,通常用于读操作或者幂等的写操作,其核心源码如下所示,其核心步骤均在源码基础上进行了相应的注释。

        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        //获取重试次数,重试次数+1就是最多的调用次数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        //上一次调用失败的异常
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); 
        Set<String> providers = new HashSet<String>(len);
        //for循环重试次数+1次,只要调用调用成功直接返回
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                //获取当前可用的invokers
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            //根据负载均衡选择一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            //将选择的invoker放入invoked列表,后续负载均衡选择invoker时会避免选择已经选择过得invoker
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                //记录最后一次调用失败的异常,如果最后调用重试次数+1次后仍然失败,则抛出最后一次失败调用的异常
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }

\color{green}{Failsafe Cluster}

安全失败,当调用远程服务出现异常时,直接忽略异常,通常用于无关紧要的业务,如日志记录。其核心代码如下所示,

    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        //调用失败,忽略异常直接正常返回
        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); 
    }

\color{green}{Forking Cluster}

并行调用,当消费方进行服务调用时,会并行调用多个服务提供者,只要有一个服务提供者返回即可,通常用于对实时性要求比较高的读操作,虽然提高了响应时间,但是浪费了更多的服务器资源,典型的空间换时间的操作。其核心代码如下所示,首先获取并行数和调用超时时间,通过负载均衡选择被调用的invoker列表,将这些invoker的调用放入线程池中处理,并将调用结果放入阻塞队列中,从阻塞队列中获取返回结果直至超时时间,返回异常信息获取成功调用的结果。

    try {
        checkInvokers(invokers, invocation);
        final List<Invoker<T>> selected;
        //获取并行数量,同时调用多少个invoker
        final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
        //支持超时时间
        final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        //通过负载均衡选取invokers
        if (forks <= 0 || forks >= invokers.size()) {
            selected = invokers;
        } else {
            selected = new ArrayList<>(forks);
            while (selected.size() < forks) {
                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                if (!selected.contains(invoker)) {
                    //Avoid add the same invoker several times.
                    selected.add(invoker);
                }
            }
        }
        RpcContext.getContext().setInvokers((List) selected);
        final AtomicInteger count = new AtomicInteger();
        //存储调用结果的阻塞队列
        final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
        //将所有选择的invoker放入线程池中,并将调用结果存储在ref中
        for (final Invoker<T> invoker : selected) {
            executor.execute(() -> {
                try {
                    Result result = invoker.invoke(invocation);
                    ref.offer(result);
                } catch (Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            });
        }
        try {
            //从ref中不断尝试获取调用结果直至超时时间timeout
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
            //如果调用异常直接抛出
            if (ret instanceof Throwable) {
                Throwable e = (Throwable) ret;
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
            }
            //成功调用返回调用结果
            return (Result) ret;
        } catch (InterruptedException e) {
            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
        }
    } finally {
        // clear attachments which is binding to current thread.
        RpcContext.getContext().clearAttachments();
    }

\color{green}{Mergeable Cluster}

聚合调用,合并结果集,通常和group一起使用。

实现自定义的集群容错策略

自定义的集群容错策略,如Dubbo提供的Forking Cluster并行调用策略,该策略并行的调用若干个服务提供者,返回第一个完成调用的结果,但是并没有关心调用是否成功,这里对这种策略进行一下优化,要求超时时间内忽略失败的调用结果,返回第一个成功调用的结果,如果都调用失败或者超时则抛出异常。
要实现上述功能,首先需要对Dubbo中的Cluster架构了解。

Dubbo中Cluster被定义为一个扩展点,默认为FailoverCluster,说明我们能够实现该扩展点,自定义Cluster的逻辑。

@SPI(FailoverCluster.NAME)
public interface Cluster {
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

Dubbo源码中通过org.apache.dubbo.rpc.cluster.Cluster文件指定了10中集群容错的策略,大多数上文中均有介绍。
org.apache.dubbo.rpc.cluster.Cluster文件

mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster

以FailoverCluster为例,其源码如下所示

public class FailoverCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<>(directory);
    }

}

其类结构为



FailoverCluster继承自AbstractCluster,AbstractCluster实现了Cluster接口,实现了一些基本功能,FailoverCluster的doJoin方法返回一个FailoverClusterInvoker,FailoverClusterInvoker类的继承关系如下所示


FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker实现了Invoker接口。

当我们需要实现自定义的CustomizedCluster时,同样是要继承AbstractCluster类,

public class CustomizedCluster extends AbstractCluster {

    public final static String NAME = "failover";

    @Override
    public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
        return new CustomizedClusterInvoker<>(directory);
    }

}

然后编写CustomizedClusterInvoker类

public class CustomizedClusterInvoker<T> extends AbstractClusterInvoker<T> {

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>(forks);
                while (selected.size() < forks) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {
                executor.execute(() -> {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selected.size()) {
                            ref.offer(e);
                        }
                    }
                });
            }
            RpcException le = null; 
            try {
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                while(ret != null) {
                    //忽略异常返回
                    if (ret instanceof Throwable) {
                        Throwable e = (Throwable) ret;
                        le = e;
                        ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                        continue;
                    }
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                if(le == null) {
                    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
                }
                throw le;
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
}

总结
本文主要介绍了Dubbo中的集群容错机制,Dubbo中提供了十来种容错机制,可以通过不同的业务逻辑选取合适的容错机制,集群对象Cluster是一个扩展点,我们可以实现该扩展点实现了定制化的容错逻辑,本文针对Dubbo提供的ForkingCluster进行了优化。

写在最后
源码路上,共勉!!!

相关文章

网友评论

      本文标题:Dubbo集群容错策略简介

      本文链接:https://www.haomeiwen.com/subject/rzkgqktx.html