美文网首页
dubbo源码3-cluster

dubbo源码3-cluster

作者: modou1618 | 来源:发表于2019-01-02 22:26 被阅读0次

    仅consumer端会有cluster层的处理,在RegistryProtocol.refer()返回使用RegistryDirectory对象(见注册中心)初始化的cluster层Invoker,返回给配置解析层,交给spring管理。

    3.1 路由实现

    • 条件路由
      基于url属性信息作为条件进行provider路由选择
    • 脚本路由
      ⽀持 JDK 脚本引擎的脚本所实现的路由算法
    • 文件路由
      文件保存脚本程序,实际转换成脚本路由实现

    3.2 动态配置

    3.3 负载均衡

    除一致性hash策略外其他负载均衡策略都基于provider权重.获取权重代码如下,其中:

    • WARMUP_KEY表示是否支持预热,获取的是服务预热时间,默认预热时间10分钟。
    • 服务上线时间超过预热时间则使用配置的接口权重。
    • 服务上线时间小于预热时间,则接口权重=(上线时间/预热时间)*权重。 最小为1,最大为配置的接口权重。
    protected int getWeight(Invoker<?> invoker, Invocation invocation) {
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
            if (weight > 0) {
                long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {
                    int uptime = (int) (System.currentTimeMillis() - timestamp);
                    int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                    if (uptime > 0 && uptime < warmup) {
                        weight = calculateWarmupWeight(uptime, warmup, weight);
                    }
                }
            }
            return weight;
        }
    

    3.3.1 最少活跃

    • ActiveLimitFilter消费端过滤器统计并更新RpcStatus接口信息。
      int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); 获取provider的活跃调用数。
    • 取最少活跃数的provider。
    • 如有多个相同最少活跃数,则按照3.3.2随机策略选择provider。
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            int length = invokers.size(); // Number of invokers
            int leastActive = -1; // The least active value of all invokers
            int leastCount = 0; // The number of invokers having the same least active value (leastActive)
            int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
            int totalWeight = 0; // The sum of with warmup weights
            int firstWeight = 0; // Initial value, used for comparision
            boolean sameWeight = true; // Every invoker has the same weight value?
            for (int i = 0; i < length; i++) {
                Invoker<T> invoker = invokers.get(i);
                int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
                int afterWarmup = getWeight(invoker, invocation); // Weight
                if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                    leastActive = active; // Record the current least active value
                    leastCount = 1; // Reset leastCount, count again based on current leastCount
                    leastIndexs[0] = i; // Reset
                    totalWeight = afterWarmup; // Reset
                    firstWeight = afterWarmup; // Record the weight the first invoker
                    sameWeight = true; // Reset, every invoker has the same weight value?
                } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                    leastIndexs[leastCount++] = i; // Record index number of this invoker
                    totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
                    // If every invoker has the same weight?
                    if (sameWeight && i > 0
                            && afterWarmup != firstWeight) {
                        sameWeight = false;
                    }
                }
            }
            // assert(leastCount > 0)
            if (leastCount == 1) {
                // If we got exactly one invoker having the least active value, return this invoker directly.
                return invokers.get(leastIndexs[0]);
            }
            if (!sameWeight && totalWeight > 0) {
                // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
                int offsetWeight = random.nextInt(totalWeight) + 1;
                // Return a invoker based on the random value.
                for (int i = 0; i < leastCount; i++) {
                    int leastIndex = leastIndexs[i];
                    offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                    if (offsetWeight <= 0)
                        return invokers.get(leastIndex);
                }
            }
            // If all invokers have the same weight value or totalWeight=0, return evenly.
            return invokers.get(leastIndexs[random.nextInt(leastCount)]);
        }
    

    3.3.2 轮询

    每个invoker有一个固定权重值,和一个选择权重值。每次invoker取选择权重最大的invoker。

    • 若落选,则选择权重+=固定权重。
    • 若选中,则选择权重-=所有invoker固定权重总和。
      即权重高的会更快的优先被选中。

    3.3.3 随机

    • 若权重相同则随机选取。
    • 若权重不同,则以总权重值为区间随机选一个数,前n个provider权重大于随机值,则选择第n个provider
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
            
            // If all invokers have the same weight value or totalWeight=0, return evenly.
            return invokers.get(random.nextInt(length));
    

    3.3.4 一致性hash

    • 以treemap存储hash值
    • 每个provider节点分解成默认160个虚拟节点,计算hash值后存储到treemap中
    • treemap.tailMap()获取目标hash的后续节点,存在则选择第一个。不存在则选择整个treemap的第一个。

    3.4 失败策略

    策略 说明
    FailoverClusterInvoker 失败换provider重试
    FailFastClusterInvoker 快速失败,只尝试一次
    FailSafeClusterInvoker 失败忽略,纪录日志
    FailBackClusterInvoker 失败重试,重走provider list,负载均衡筛选
    BroadcastClusterInvoker 广播,一个失败则失败
    ForkClusterInvoker 多个同时发,一个成功则成功
    MergeClusterInvoker 多个同时发,获取结果后merge返回
    AvailableClusterInvoker 第一个有效的provider

    3.5 接口调用

    cluster层接口调用流程.png

    相关文章

      网友评论

          本文标题:dubbo源码3-cluster

          本文链接:https://www.haomeiwen.com/subject/uiptrqtx.html