美文网首页
dubbo源码3-cluster

dubbo源码3-cluster

作者: modou1618 | 来源:发表于2019-01-02 22:26 被阅读0次

仅consumer端会有cluster层的处理,在RegistryProtocol.refer()返回使用RegistryDirectory对象(见注册中心)初始化的cluster层Invoker,返回给配置解析层,交给spring管理。

3.1 路由实现

  • 条件路由
    基于url属性信息作为条件进行provider路由选择
  • 脚本路由
    ⽀持 JDK 脚本引擎的脚本所实现的路由算法
  • 文件路由
    文件保存脚本程序,实际转换成脚本路由实现

3.2 动态配置

3.3 负载均衡

除一致性hash策略外其他负载均衡策略都基于provider权重.获取权重代码如下,其中:

  • WARMUP_KEY表示是否支持预热,获取的是服务预热时间,默认预热时间10分钟。
  • 服务上线时间超过预热时间则使用配置的接口权重。
  • 服务上线时间小于预热时间,则接口权重=(上线时间/预热时间)*权重。 最小为1,最大为配置的接口权重。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }

3.3.1 最少活跃

  • ActiveLimitFilter消费端过滤器统计并更新RpcStatus接口信息。
    int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); 获取provider的活跃调用数。
  • 取最少活跃数的provider。
  • 如有多个相同最少活跃数,则按照3.3.2随机策略选择provider。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int leastActive = -1; // The least active value of all invokers
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        int totalWeight = 0; // The sum of with warmup weights
        int firstWeight = 0; // Initial value, used for comparision
        boolean sameWeight = true; // Every invoker has the same weight value?
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
            int afterWarmup = getWeight(invoker, invocation); // Weight
            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                leastActive = active; // Record the current least active value
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = afterWarmup; // Reset
                firstWeight = afterWarmup; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexs[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offsetWeight = random.nextInt(totalWeight) + 1;
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

3.3.2 轮询

每个invoker有一个固定权重值,和一个选择权重值。每次invoker取选择权重最大的invoker。

  • 若落选,则选择权重+=固定权重。
  • 若选中,则选择权重-=所有invoker固定权重总和。
    即权重高的会更快的优先被选中。

3.3.3 随机

  • 若权重相同则随机选取。
  • 若权重不同,则以总权重值为区间随机选一个数,前n个provider权重大于随机值,则选择第n个provider
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = random.nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
        
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(random.nextInt(length));

3.3.4 一致性hash

  • 以treemap存储hash值
  • 每个provider节点分解成默认160个虚拟节点,计算hash值后存储到treemap中
  • treemap.tailMap()获取目标hash的后续节点,存在则选择第一个。不存在则选择整个treemap的第一个。

3.4 失败策略

策略 说明
FailoverClusterInvoker 失败换provider重试
FailFastClusterInvoker 快速失败,只尝试一次
FailSafeClusterInvoker 失败忽略,纪录日志
FailBackClusterInvoker 失败重试,重走provider list,负载均衡筛选
BroadcastClusterInvoker 广播,一个失败则失败
ForkClusterInvoker 多个同时发,一个成功则成功
MergeClusterInvoker 多个同时发,获取结果后merge返回
AvailableClusterInvoker 第一个有效的provider

3.5 接口调用

cluster层接口调用流程.png

相关文章

网友评论

      本文标题:dubbo源码3-cluster

      本文链接:https://www.haomeiwen.com/subject/uiptrqtx.html