美文网首页
Dubbo集群容错策略简介

Dubbo集群容错策略简介

作者: 花醉霜寒 | 来源:发表于2020-07-07 11:36 被阅读0次

    Dubbo集群容错策略

    Dubbo支持的集群容错策略包括:

    • Available Cluster,可用调用
    • Broadcast Cluster,广播调用
    • Failback Cluster,失败自动恢复
    • Failfast Cluster,快速失败
    • Failover Cluster,失败重试
    • Failsafe Cluster,安全失败
    • Forking Cluster,并行调用
    • Mergeable Cluster,聚合调用

    \color{green}{Available Cluster}

    可用调用,遍历服务提供方获取第一个可用的服务来进行调用,其核心代码如下所示,遍历所有的invokers,找到第一个可用的invoker直接返回

        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
    

    不同注册中心的isAvailable()方法实现不一样,以zookeeper和redis注册中心为例,

        //ZookeeperRegistry
        public boolean isAvailable() {
            //判断是否已连接
            return zkClient.isConnected();
        }
    
        //RedisRegistry
        public boolean isAvailable() {
            for (JedisPool jedisPool : jedisPools.values()) {
                try {
                    //判断是否有可用的redis连接
                    Jedis jedis = jedisPool.getResource();
                    try {
                        if (jedis.isConnected()) {
                            return true; 
                        }
                    } finally {
                        jedisPool.returnResource(jedis);
                    }
                } catch (Throwable t) {
                }
            }
            return false;
        }
    
    

    不同注册中心的实现思路差不多,都是需要获取可用的连接。

    \color{green}{Broadcast Cluster}

    广播调用,当消费者调用一个远程接口时,Dubbo Client会依次调用该接口的所有服务提供者,任意一个服务调用失败则标志服务调用失败,常用于通知所有提供者更新缓存等本地信息。循环调用所有的invokers,如果有一个调用失败,那么整个调用抛出异常,标志该调用失败。

        //定义Rpc调用的异常
        RpcException exception = null;
        Result result = null;
        //遍历所有invokers
        for (Invoker<T> invoker : invokers) {
            try {
                //依次调用
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                //捕获异常
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        //存在异常直接抛出
        if (exception != null) {
            throw exception;
        }
    

    \color{green}{Failback Cluster}

    失败自动恢复,当服务端调用失败后,会在后台记录请求失败的请求,并按照一定的策略后期进行重试,这种模式常用于消息通知等操作。这种策略会对失败的调用做后期补偿,将调用失败的记录保存起来,后续根据一定的策略进行重试,其核心代码如下所示,

        Invoker<T> invoker = null;
        try {
            //检查invokers,主要进行null判断
            checkInvokers(invokers, invocation);
            //根据负载均衡选择一个invoker
            invoker = select(loadbalance, invocation, invokers, null);
            //进行远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            //如果调用失败,存在异常,将本次调用保存起来
            addFailed(loadbalance, invocation, invokers, invoker);
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    

    \color{green}{Failfast Cluster}

    快速失败,服务调用失败后,立即报错,这种模式用于非幂等的写操作。当远程调用出现异常,直接将异常抛出,其核心代码如下所示

        //检查invokers,主要进行null判断
        checkInvokers(invokers, invocation);
        //根据负载均衡选择一个invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
        //捕获异常抛出,不做人任何处理
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                    "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                            + " select from all providers " + invokers + " for service " + getInterface().getName()
                            + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion()
                            + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                    e.getCause() != null ? e.getCause() : e);
        }
    

    \color{green}{Failover Cluster}

    当消费方调用服务失败后,会自动切换去请求其他服务提供者,可以设置重试次数,通常用于读操作或者幂等的写操作,其核心源码如下所示,其核心步骤均在源码基础上进行了相应的注释。

            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            //获取重试次数,重试次数+1就是最多的调用次数
            int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            //上一次调用失败的异常
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); 
            Set<String> providers = new HashSet<String>(len);
            //for循环重试次数+1次,只要调用调用成功直接返回
            for (int i = 0; i < len; i++) {
                if (i > 0) {
                    checkWhetherDestroyed();
                    //获取当前可用的invokers
                    copyInvokers = list(invocation);
                    // check again
                    checkInvokers(copyInvokers, invocation);
                }
                //根据负载均衡选择一个invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                //将选择的invoker放入invoked列表,后续负载均衡选择invoker时会避免选择已经选择过得invoker
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    //记录最后一次调用失败的异常,如果最后调用重试次数+1次后仍然失败,则抛出最后一次失败调用的异常
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
    

    \color{green}{Failsafe Cluster}

    安全失败,当调用远程服务出现异常时,直接忽略异常,通常用于无关紧要的业务,如日志记录。其核心代码如下所示,

        try {
            checkInvokers(invokers, invocation);
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            //调用失败,忽略异常直接正常返回
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); 
        }
    

    \color{green}{Forking Cluster}

    并行调用,当消费方进行服务调用时,会并行调用多个服务提供者,只要有一个服务提供者返回即可,通常用于对实时性要求比较高的读操作,虽然提高了响应时间,但是浪费了更多的服务器资源,典型的空间换时间的操作。其核心代码如下所示,首先获取并行数和调用超时时间,通过负载均衡选择被调用的invoker列表,将这些invoker的调用放入线程池中处理,并将调用结果放入阻塞队列中,从阻塞队列中获取返回结果直至超时时间,返回异常信息获取成功调用的结果。

        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            //获取并行数量,同时调用多少个invoker
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            //支持超时时间
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //通过负载均衡选取invokers
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>(forks);
                while (selected.size() < forks) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            //存储调用结果的阻塞队列
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            //将所有选择的invoker放入线程池中,并将调用结果存储在ref中
            for (final Invoker<T> invoker : selected) {
                executor.execute(() -> {
                    try {
                        Result result = invoker.invoke(invocation);
                        ref.offer(result);
                    } catch (Throwable e) {
                        int value = count.incrementAndGet();
                        if (value >= selected.size()) {
                            ref.offer(e);
                        }
                    }
                });
            }
            try {
                //从ref中不断尝试获取调用结果直至超时时间timeout
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                //如果调用异常直接抛出
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                //成功调用返回调用结果
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    

    \color{green}{Mergeable Cluster}

    聚合调用,合并结果集,通常和group一起使用。

    实现自定义的集群容错策略

    自定义的集群容错策略,如Dubbo提供的Forking Cluster并行调用策略,该策略并行的调用若干个服务提供者,返回第一个完成调用的结果,但是并没有关心调用是否成功,这里对这种策略进行一下优化,要求超时时间内忽略失败的调用结果,返回第一个成功调用的结果,如果都调用失败或者超时则抛出异常。
    要实现上述功能,首先需要对Dubbo中的Cluster架构了解。

    Dubbo中Cluster被定义为一个扩展点,默认为FailoverCluster,说明我们能够实现该扩展点,自定义Cluster的逻辑。

    @SPI(FailoverCluster.NAME)
    public interface Cluster {
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    }
    

    Dubbo源码中通过org.apache.dubbo.rpc.cluster.Cluster文件指定了10中集群容错的策略,大多数上文中均有介绍。
    org.apache.dubbo.rpc.cluster.Cluster文件

    mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
    failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
    failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
    failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
    failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
    forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
    available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
    mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
    broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
    zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
    

    以FailoverCluster为例,其源码如下所示

    public class FailoverCluster extends AbstractCluster {
    
        public final static String NAME = "failover";
    
        @Override
        public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<>(directory);
        }
    
    }
    

    其类结构为



    FailoverCluster继承自AbstractCluster,AbstractCluster实现了Cluster接口,实现了一些基本功能,FailoverCluster的doJoin方法返回一个FailoverClusterInvoker,FailoverClusterInvoker类的继承关系如下所示


    FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker实现了Invoker接口。

    当我们需要实现自定义的CustomizedCluster时,同样是要继承AbstractCluster类,

    public class CustomizedCluster extends AbstractCluster {
    
        public final static String NAME = "failover";
    
        @Override
        public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
            return new CustomizedClusterInvoker<>(directory);
        }
    
    }
    

    然后编写CustomizedClusterInvoker类

    public class CustomizedClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        /**
         * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
         * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
         */
        private final ExecutorService executor = Executors.newCachedThreadPool(
                new NamedInternalThreadFactory("forking-cluster-timer", true));
    
        public ForkingClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                final List<Invoker<T>> selected;
                final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
                final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                if (forks <= 0 || forks >= invokers.size()) {
                    selected = invokers;
                } else {
                    selected = new ArrayList<>(forks);
                    while (selected.size() < forks) {
                        Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                        if (!selected.contains(invoker)) {
                            //Avoid add the same invoker several times.
                            selected.add(invoker);
                        }
                    }
                }
                RpcContext.getContext().setInvokers((List) selected);
                final AtomicInteger count = new AtomicInteger();
                final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
                for (final Invoker<T> invoker : selected) {
                    executor.execute(() -> {
                        try {
                            Result result = invoker.invoke(invocation);
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    });
                }
                RpcException le = null; 
                try {
                    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                    while(ret != null) {
                        //忽略异常返回
                        if (ret instanceof Throwable) {
                            Throwable e = (Throwable) ret;
                            le = e;
                            ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                            continue;
                        }
                    }
                    return (Result) ret;
                } catch (InterruptedException e) {
                    if(le == null) {
                        throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
                    }
                    throw le;
                }
            } finally {
                // clear attachments which is binding to current thread.
                RpcContext.getContext().clearAttachments();
            }
        }
    }
    
    

    总结
    本文主要介绍了Dubbo中的集群容错机制,Dubbo中提供了十来种容错机制,可以通过不同的业务逻辑选取合适的容错机制,集群对象Cluster是一个扩展点,我们可以实现该扩展点实现了定制化的容错逻辑,本文针对Dubbo提供的ForkingCluster进行了优化。

    写在最后
    源码路上,共勉!!!

    相关文章

      网友评论

          本文标题:Dubbo集群容错策略简介

          本文链接:https://www.haomeiwen.com/subject/rzkgqktx.html