美文网首页
第12章 Dubbo 集群容错的设计与实现

第12章 Dubbo 集群容错的设计与实现

作者: 原水寒 | 来源:发表于2019-08-19 07:51 被阅读0次
    image.png

    本节介绍 Dubbo 十层架构中的 Cluster 层。首先来看 Cluster 层包含的组件及其调用关系:Cluster、ClusterInvoker、Directory、Router、LoadBalance。


    image.png
    1. 首先使用 Cluster(eg. FailoverCluster) 创建一个具体的 ClusterInvoker(eg. FailoverClusterInvoker);(Cluster 是 ClusterInvoker 的抽象工厂
    2. 之后调用 ClusterInvoker#list 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表,然后 Directory 会调用其包含的 List<Router> 进行过滤(此时可实现读写分离等);
    3. 之后调用 ClusterInvoker#select:根据 SPI 机制获取 LoadBalance 实例,之后使用 LoadBalance 实例从 2 中过滤出的 List<Invoker> 中选出最终的一个 Invoker;
    4. 最后调用 ClusterInvoker#doInvoke:调用 3 中选出的 Invoker#invoke(...),发起远程调用。

    一、Cluster 接口

    image.png

    Dubbo 提供了八种不同姿势的 Cluster(分别实现了不同的集群容错策略):

    • FailoverCluster(默认):当请求失败时(如果是抛出业务异常 bizException,则直接抛出异常 RpcException,不再重试),否则重试其他服务器。
    • 可以通过 retries = "2" 来设置重试次数,重试次数为 retries + 1
    • 使用场景:读操作或幂等的写操作。
    • FailfastCluster:当请求失败时,直接抛出 RpcException。
    • 使用场景:非幂等操作
    • FailbackCluster:当请求失败时,会记录在失败列表中,并由一个定时线程池进行定时重试(每5s执行一次重试),如果重试成功,则从失败列表中删除,否则,记录error日志,下一个5s继续重试(如果一直失败,会一直重试)。
    • 使用场景:异步或最终一致性,eg. 通知服务。
    • FailsafeCluster:当请求失败时,会忽略异常。即不关心响应结果的成功与否。
    • 使用场景:eg. 写日志。
    • AvailableCluster:遍历所有节点,找到第一个可用节点,直接请求并返回响应;如果没有可用节点,则抛出异常。(不会对请求做负载均衡,对于 DubboInvoker 来讲,可用 = Invoker 没有被 destroy && 至少有一个 NettyClient 处于已连接状态 && 不是只读)
    • ForkingCluster:同时并行调用多个相同的服务(使用异步线程池),此时主线程等待返回结果(默认等待1s,可通过 timeout 参数进行配置),只要其中一个返回,则立即返回结果;如果全失败了,则抛出异常;如果 timeout 内没返回响应结果,则抛出 timeout 异常
    • 通过配置 forks = "2"(默认为2) 来指定并行调用的服务数量
    • 通过配置 timeout="5000"(默认为1000,即1s)来指定主线程等待并行异步线程返回结果的时间
    • 使用场景:对实时性要求极高。
    • BroadcastCluster:广播调用所有可用的服务,任意一个节点报错则报错
    • MergeableCluster:自动把对多个节点请求的结果进行合并。

    Dubbo 还提供了一种 ClusterWrapper(MockClusterWrapper),其包含一个具体的 Cluster(eg. FailoverCluster),通过 AOP 的方式实现了 mock 操作。

    @SPI(FailoverCluster.NAME) // 默认 FailoverCluster
    public interface Cluster {
        /**
         * 创建 ClusterInvoker,该 ClusterInvoker 持有 Directory 实例。
         * 实际上就是将 Directory 中的多个 Invoker 封装成了一个 Invoker(ClusterInvoker)。
         * 
         * Cluster 是 ClusterInvoker 的抽象工厂。
         */
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    }
    

    Cluster 是 ClusterInvoker 的抽象工厂,用于创建 ClusterInvoker,该 ClusterInvoker 持有 Directory 实例,实际上就是将 Directory 中的多个 Invoker 封装成了一个 Invoker(ClusterInvoker)。

    以 FailoverCluster 和 FailfastCluster 为例。

    public class FailoverCluster implements Cluster {
        public final static String NAME = "failover";
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    }
    
    public class FailfastCluster implements Cluster {
        public final static String NAME = "failfast";
        @Override
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailfastClusterInvoker<T>(directory);
        }
    }
    

    二、ClusterInvoker

    image.png

    针对每一种 Cluster,Dubbo 都提供了对应的 ClusterInvoker(该 ClusterInvoker 由对应的 Cluster 来创建),其中 MockClusterInvoker 和 MergeableClusterInvoker 较为特殊,直接实现了 Invoker 接口;其他的 ClusterInvoker 都继承与模板基类 AbstractClusterInvoker,复写了其中的 doInvoker(...) 方法。

    MockClusterInvoker 和 MergeableClusterInvoker 较为特殊,后续分析。

    AbstractClusterInvoker 模板基类

    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
        protected final Directory<T> directory;
        ...
        public AbstractClusterInvoker(Directory<T> directory) {
            this(directory, directory.getUrl());
        }
        ...
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            ...
            LoadBalance loadbalance = null;
            ...
            /**
             * 1. 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表
             *    然后 Directory 会调用其包含的 List<Router> 进行过滤
             */
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && !invokers.isEmpty()) {
                /**
                 * 2. 根据 SPI 机制获取 LoadBalance 实例
                 */
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
            }
            ...
            /**
             * 3. 调用子类实现负载均衡选择和rpc调用
             */
            return doInvoke(invocation, invokers, loadbalance);
        }
    
        /**
         * 从 Directory(eg. RegistryDirectory)中根据方法名获取具体的 Invoker(eg. DubboInvoker(filtered))列表,
         * 然后 Directory 会调用其包含的 List<Router> 进行过滤
         */
        protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            return directory.list(invocation);
        }
    
        /**
         * 调用子类实现负载均衡选择和rpc调用
         */
        protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                           LoadBalance loadbalance) throws RpcException;
    
        /**
         * 选择一个 Invoker。
         * a) 首先, 使用 loadbalance 选择一个 Invoker。如果该 Invoker 之前已经被选择了(在 selected List 中)或者该 Invoker 不可用(eg. DubboInvoker,netty 没有处于连接状态),则进行重新选择操作(reselect);否则返回直接返回该 Invoker;
         * b) 重新选择操作(reselect):
         *     1. 使用 lb 从可用的且之前没有被选择的Invoker列表中选择一个,如果成功,直接返回,否则;
         *     2. 使用 lb 从可用的且之前被选择的Invoker列表中选择一个,如果成功,直接返回
         * c) 如果 reselect 还没有成功的话,则从 invokers(被 List<Router> 过滤过的)列表中选出 a) 中的选出的 Invoker 的下一个 Invoker
         */
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            // 1. 处理 sticky 逻辑
            ...
            // 2. 真正的进行选择
            Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
            ...
            return invoker;
        }
    
        private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            ...
            // 如果只有一个 Invoker,直接返回,否则进行选择操作
            if (invokers.size() == 1) {
                return invokers.get(0);
            }
            if (loadbalance == null) {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
            }
            // 1. 使用 loadbalance 进行选择
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            // 2. 如果该 Invoker 之前已经被选择了(在 selected List 中)或者该 Invoker 不可用(eg. DubboInvoker,netty 没有处于连接状态),则进行重新选择操作(reselect(...))
            if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                // 2.1. 进行重新选择,如果选择成功,则直接返回;否则获取当前选出的 Invoker 的下一个 Invoker
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    int index = invokers.indexOf(invoker);
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                }
            }
            return invoker;
        }
    
        /**
         * 重新选择操作(reselect):
         * 1. 使用 lb 从可用的且之前没有被选择的Invoker列表中选择一个,如果成功,直接返回,否则;
         * 2. 使用 lb 从可用的且之前被选择的Invoker列表中选择一个,如果成功,直接返回
         */
        private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
            ...
        }
        ...
    }
    

    选择部分的流程相对复杂,流程图如下:

    image.png

    1、FailoverClusterInvoker

    三种使用姿势
    <!-- 1. 所有的消费者使用 FailoverCluster,且重试次数为6次(retries+1) -->
    <dubbo:consumer cluster="failover" retries="5"/>
    <!-- 2. 指定的消费者的所有方法使用 FailoverCluster,且重试次数为6次(retries+1) -->
    <dubbo:reference ... cluster="failover" retries="5"/>
    <!-- 3. 指定的消费者使用 FailoverCluster,且指定方法 sayHello 的重试次数为5次(retries+1),其余方法使用默认值 -->
    <dubbo:reference ... cluster="failover">
        <dubbo:method name="sayHello" retries="5"/>
    </dubbo:reference>
    
    • retries 默认为2,即重试3次
    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
        ...
        RpcException le = null; // last exception.
        @Override
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            // 默认重试3次(retries+1)
            int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
            ...
            // 已选择列表
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
            for (int i = 0; i < len; i++) {
                // 每一次重新选择都要重置 copyinvokers:重新从 Directory 根据方法名进行 Invoker(filtered) 的获取。
                // 因为有可能此时提供者列表已经发生了变化,提供者列表发生变化,Directory 也会更新其缓存的 Invoker(filtered) 列表。
                if (i > 0) {
                    copyinvokers = list(invocation);
                }
                // 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                // 2. 将选择出来的 Invoker 添加到已选择列表中,供下一次选择时使用
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    // 3. 发起 rpc 调用,如果抛出异常(业务异常,直接抛出,不再重试),否则,catch 住,进行下一个 Invoker 的重试
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
                    if (e.isBiz()) { // biz exception.
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    le = new RpcException(e.getMessage(), e);
                }
            }
            throw new RpcException(...le.getCause());
        }
    }
    

    2、FailbackClusterInvoker

    public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
        /**
         * 重试线程池
         */
        private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true));
        /**
         * 调用失败列表
         */
        private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
        private volatile ScheduledFuture<?> retryFuture;
        ...
        private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
            // 1. 双重检测创建并启动定时重试任务
            if (retryFuture == null) {
                synchronized (this) {
                    if (retryFuture == null) {
                        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    retryFailed();
                                } catch (Throwable t) { // Defensive fault tolerance
                                    logger.error("Unexpected error occur at collect statistic", t);
                                }
                            }
                        }, 5 * 1000, 5 * 1000, TimeUnit.MILLISECONDS);
                    }
                }
            }
            // 2. 将失败的调用加入到失败列表中
            failed.put(invocation, router);
        }
    
        void retryFailed() {
            if (failed.size() == 0) {
                return;
            }
            for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
                Invocation invocation = entry.getKey();
                Invoker<?> invoker = entry.getValue();
                try {
                    // 对于失败的 Invoker,进行重试:如果重试成功,则从失败列表中删除,否则,记录error日志,下一个5s继续重试(如果一直失败,会一直重试)
                    invoker.invoke(invocation);
                    failed.remove(invocation);
                } catch (Throwable e) {
                    logger.error(...);
                }
            }
        }
    
        @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                // 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                // 2. 发起 rpc 调用,如果失败,添加到失败列表,后续进行定时重试。返回空 RpcResult
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                        + e.getMessage() + ", ", e);
                // 如果失败,添加到失败列表,后续进行定时重试
                addFailed(invocation, this);
                return new RpcResult(); // ignore
            }
        }
    }
    

    3、FailfastClusterInvoker

    public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
        ...
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            // 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            try {
                // 2. 发起 rpc 调用,如果失败,直接抛出异常
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                ...
                throw new RpcException(...);
            }
        }
    }
    

    4、FailsafeClusterInvoker

    public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
        ...
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                // 1. 调用父类 AbstractClusterInvoker 的选择机制进行选择
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                // 2. 发起 rpc 调用,如果失败,直接忽略,返回空的 RpcResult
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                return new RpcResult(); // ignore
            }
        }
    }
    

    5、AvailableClusterInvoker

    public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> {
        ...
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            // 找出第一个可用的 Invoker,直接发起 rpc 调用(没有负载均衡)
            for (Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    return invoker.invoke(invocation);
                }
            }
            throw new RpcException("No provider available in " + invokers);
        }
    }
    

    对于 DubboInvoker,可用的逻辑如下:

        @Override
        public boolean isAvailable() {
            // AbstractInvoker:Invoker 没有被 destroy,则可用
            if (!super.isAvailable()) {
                return false;
            }
            // 只要有一个 NettyClient 处于已连接状态 && 不是只读,则可用
            for (ExchangeClient client : clients) {
                if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
                    return true;
                }
            }
            return false;
        }
    

    6、ForkingClusterInvoker

    两种使用姿势
    <!-- 1. 所有的消费者使用ForkingCluster,且同时并发调用3个Invoker(forks),并且等待结果返回时间为5s -->
    <dubbo:consumer cluster="forking" forks="3" timeout="5000"/>
    <!-- 2. 指定的消费者使用ForkingCluster,且同时并发调用3个Invoker(forks),并且等待结果返回时间为5s -->
    <dubbo:reference ... cluster="forking" forks="3" timeout="5000"/>
    
    public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
        /** 异步并行调用线程池 */
        private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
        ...
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> selected; // 选择多少个Invoker进行调用
            int forks = getUrl().getParameter("forks", 2);
            int timeout = getUrl().getParameter("timeout", 1000);
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers; // 选择全部Invoker进行调用
            } else {
                selected = new ArrayList<Invoker<T>>();
                // 1. 多次调用父类 AbstractClusterInvoker 的选择机制进行 Invoker 的选择,并添加到 List<Invoker<T>> selected 中
                for (int i = 0; i < forks; i++) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) { // 去重操作
                        selected.add(invoker);
                    }
                }
            }
            ...
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
            for (final Invoker<T> invoker : selected) {
                executor.execute(new Runnable() {
                    public void run() {
                        try {
                            // 2. 使用线程池并行发起多次调用,并将调用结果存储到ref阻塞队列中;如果全部调用都失败了,则将失败异常也存储到ref中
                            Result result = invoker.invoke(invocation);
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            // 全部调用都失败了
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            // 3. 主线程从阻塞队列弹出第一个对象(阻塞等待1s,即1s内如果调用无法完成,则抛出 timeout 异常;如果1s内有任一个调用返回,则直接处理),如果得到的是异常,则直接抛出,否则,返回调用结果
            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
            if (ret instanceof Throwable) {
                throw new RpcException(...ret...);
            }
            return (Result) ret;
        }
    }
    

    7、BroadcastClusterInvoker

    public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
        ...
        @Override
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            RpcException exception = null;
            Result result = null;
            // 对每一个节点进行调用,只要有一个失败,直接抛出异常
            for (Invoker<T> invoker : invokers) {
                try {
                    result = invoker.invoke(invocation);
                }  catch (Throwable e) {
                    exception = new RpcException(e.getMessage(), e);
                }
            }
            if (exception != null) {
                throw exception;
            }
            return result;
        }
    }
    

    8、MergeableCluster

    在合并器 merger 部分进行分析。

    9、MockClusterInvoker

    在服务降级 mock 部分进行分析。

    相关文章

      网友评论

          本文标题:第12章 Dubbo 集群容错的设计与实现

          本文链接:https://www.haomeiwen.com/subject/pibmjctx.html