美文网首页
14.dubbo源码-负载均衡

14.dubbo源码-负载均衡

作者: 阿飞的博客 | 来源:发表于2017-11-03 17:15 被阅读724次

    LoadBalance

    从LoadBalance的实现类可知,dubbo默认的负载均衡算法总计有4种:

    • 随机算法RandomLoadBalance(默认)
    • 轮训算法RoundRobinLoadBalance
    • 最小活跃数算法LeastActiveLoadBalance
    • 一致性hash算法ConsistentHashLoadBalance

    如何选择负载均衡算法

    调用链从com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker中的invoke(final Invocation invocation)开始:
    --> AbstractClusterInvoker.invoke(Invocation)
    --> AbstractClusterInvoker.doInvoke(invocation, invokers, loadbalance)
    --> FailoverClusterInvoker.doInvoke(Invocation, final List<Invoker<T>>, LoadBalance) (说明,dubbo默认是failover集群实现,且默认最多重试2次,即默认最多for循环调用3次)
    --> AbstractClusterInvoker.select()
    --> AbstractClusterInvoker.doSelect()
    --> AbstractLoadBalance.select()
    --> RoundRobinLoadBalance.doSelect()

    选定Invoker

    一般dubbo服务都有多个Provider,即对Consumer来说有多个可以调用的Invoker,
    根据下面的源码可知,dubbo只根据第一个Invokerinvokers.get(0))确定负载均衡策略,所以选择哪个Invoker取决于List<Invoker<T>> invokers排列顺序,dubbo对Invoker排序的compare方法定义为:o1.getUrl().toString().compareTo(o2.getUrl().toString()));所以如果有10.0.0.1:20884,10.0.0.1:20886,10.0.0.1:20888三个invoke,那么第一个Invoker肯定是10.0.0.1:20884(这个排序算法后面会详细讲解);

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWheatherDestoried();
        LoadBalance loadbalance;  
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            // 排序后的Invoker集合的第一个Invoker(即invokers.get(0))的负载均衡策略就是dubbo选择的策略,默认策略为Constants.DEFAULT_LOADBALANCE,即"random",然后根据扩展机制得到负载均衡算法的实现为com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            // 如果没有任何Invoker,那么直接取默认负载
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
    

    sticky

    sticky:粘的,粘性;
    作用是在调用Invoker的重试过程中,尽可能调用前面选择的Invoker;sticky配置在consumer端,配置方式如下:

    <dubbo:reference id="testService" interface="com.alibaba.dubbo.demo.TestService" retries="2" sticky="true"/>

    sticky业务逻辑主要在如下这段代码中:

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        String methodName = invocation == null ? "" : invocation.getMethodName();
        // 判断UR上是否配置了sticky属性,sticky默认为false
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
        {
            //ignore overloaded method
            if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
                stickyInvoker = null;
            }
            //ignore cucurrent problem
            if (sticky && stickyInvoker != null && (selected == null || selected.contains(stickyInvoker))){
                if (availablecheck && stickyInvoker.isAvailable()){
                    return stickyInvoker;
                }
            }
        }
        Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
        // 如果配置了sticky="true",那么将这次选择到的Invoker缓存下来赋值给stickyInvoker
        if (sticky){
            stickyInvoker = invoker;
        }
        return invoker;
    }
    

    选择LoadBalance

    实现源码AbstractClusterInvoker:

        private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    
            // 如果没有任何可调用服务, 那么返回null
            if (invokers == null || invokers.size() == 0)
                return null;
            // 如果只有1个Invoker(provider),那么直接返回,不需要任何负载均衡算法
            if (invokers.size() == 1)
                return invokers.get(0);
            // 如果只有两个invoker,且处于重新选择(重试)过程中(selected != null && selected.size() > 0),则退化成轮循;
    
            if (invokers.size() == 2 && selected != null && selected.size() > 0) {
                return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
            }
    
            // invokers数量至少3个的话,调用具体的负载均衡实现(例如轮询,随机等)选择Invoker
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
            
            //如果处于重试过程中,且 selected中包含当前选择的invoker(优先判断) 或者 (不可用&&availablecheck=true) 则重试.
            if( (selected != null && selected.contains(invoker))
                    ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
                try{
                    Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if(rinvoker != null){
                        invoker =  rinvoker;
                    }else{
                        //看下第一次选的位置,如果不是最后,选+1位置.
                        int index = invokers.indexOf(invoker);
                        try{
                            //最后在避免碰撞
                            invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
                        }catch (Exception e) {
                            logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
                        }
                    }
                }catch (Throwable t){
                    logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
                }
            }
            return invoker;
        } 
    

    Dubbo的BUG
    这里退化成轮询的实现有问题,对应源码return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);如果retries=4,即最多调用5次,且两个可选invoke分别为:
    10.0.0.1:20884,10.0.0.1:20886;
    那么5次选择的invoke为:
    10.0.0.1:20884
    10.0.0.1:20886
    10.0.0.1:20886
    10.0.0.1:20886
    10.0.0.1:20886,
    即除了第1次外后面的选择都是选择第二个invoker;
    因次需要把selected.get(0)修改为:selected.get(selected.size()-1);
    即每次拿前一次选择的invoker与 invokers.get(0)比较,如果相同,则选则另一个invoker;否则就选 invokers.get(0);

    2. 负载均衡算法的实现

    2.1 随机算法

    RandomLoadBalance.doSelect():

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); //Invoke的总个数
        int totalWeight = 0; // 所有Invoke的总权重
        boolean sameWeight = true; // 各个Invoke权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累加总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 每个invoke与前一次遍历的invoke的权重进行比较,判断所有权重是否一样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果所有invoke的权重有不相同且总权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 确定随机值落在哪个片断上,从而取得对应的invoke,如果weight越大,那么随机值落在其上的概率越大,这个invoke被选中的概率越大
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }
    

    算法说明

    假定有3台dubbo provider:
    10.0.0.1:20884, weight=2
    10.0.0.1:20886, weight=3
    10.0.0.1:20888, weight=4
    随机算法的实现:
    totalWeight=9;
    假设offset=1(即random.nextInt(9)=1)
    1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
    假设offset=4(即random.nextInt(9)=4)
    4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3
    假设offset=7(即random.nextInt(9)=7)
    7-2=5<0?否,这时候offset=5, 5-3=2<0?否,这时候offset=2, 2-4<0?是,所以选中 10.0.0.1:20888, weight=4

    2.2 轮询算法

    RoundRobinLoadBalance.doSelect():

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // key表示方法,例如com.alibaba.dubbo.demo.TestService.getRandomNumber, 即负载均衡算法细化到每个方法的调用;
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // provider的总个数
        int maxWeight = 0; // 最大权重, 只是一个临时值, 所以设置为0, 当遍历invokers时接口的weight肯定大于0,马上就会替换成一个真实的maxWeight的值;
        int minWeight = Integer.MAX_VALUE; // 最小权重,只是一个临时值, 所以设置为Integer类型最大值, 当遍历invokers时接口的weight肯定小于这个数,马上就会替换成一个真实的minWeight的值;
    
        for (int i = 0; i < length; i++) {
            // 从Invoker的URL中获取权重时,dubbo会判断是否warnup了,即只有当invoke这个jvm进程的运行时间超过warnup(默认为10分钟)时间,配置的weight才会生效;
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // 重新计算最大权重值
            minWeight = Math.min(minWeight, weight); // 重新计算最小权重值
        }
        if (maxWeight > 0 && minWeight < maxWeight) { // 如果各provider配置的权重不一样
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
            for (Invoker<T> invoker : invokers) { 
                // 筛选权重大于当前权重基数的Invoker,从而达到给更大权重的invoke加权的目的
                if (getWeight(invoker, invocation) > currentWeight) {
                    weightInvokers.add(invoker);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
    
        // 每个方法对应一个AtomicPositiveInteger,其序数从0开始,
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        // 取模轮循
        return invokers.get(sequence.getAndIncrement() % length);
    }
    

    算法说明

    假定有3台权重都一样的dubbo provider:
    10.0.0.1:20884, weight=100
    10.0.0.1:20886, weight=100
    10.0.0.1:20888, weight=100
    轮询算法的实现:
    其调用方法某个方法(key)的sequence从0开始:
    sequence=0时,选择invokers.get(0%3)=10.0.0.1:20884
    sequence=1时,选择invokers.get(1%3)=10.0.0.1:20886
    sequence=2时,选择invokers.get(2%3)=10.0.0.1:20888
    sequence=3时,选择invokers.get(3%3)=10.0.0.1:20884
    sequence=4时,选择invokers.get(4%3)=10.0.0.1:20886
    sequence=5时,选择invokers.get(5%3)=10.0.0.1:20888

    如果有3台权重不一样的dubbo provider:
    10.0.0.1:20884, weight=50
    10.0.0.1:20886, weight=100
    10.0.0.1:20888, weight=150
    调试过很多次,这种情况下有问题;留一个TODO;

    2.3 最小活跃数算法:

    LeastActiveRobinLoadBalance.doSelect():

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 相同最小活跃数的个数
        int[] leastIndexes = new int[length]; // 相同最小活跃数的下标
        int totalWeight = 0; // 总权重
        int firstWeight = 0; // 第一个权重,用于于计算是否相同
        boolean sameWeight = true; // 是否所有权重相同
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // 活跃数从RpcStatus中取得, dubbo统计活跃数时以方法为维度;
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();         
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
            if (leastActive == -1 || active < leastActive) { // 发现更小的活跃数,重新开始
                leastActive = active; // 记录最小活跃数
                leastCount = 1; // 重新统计相同最小活跃数的个数
                leastIndexes[0] = i; // 重新记录最小活跃数下标
                totalWeight = weight; // 重新累计总权重
                firstWeight = weight; // 记录第一个权重
                sameWeight = true; // 还原权重相同标识
            } else if (active == leastActive) { // 累计相同最小的活跃数
                leastIndexes[leastCount ++] = i; // 累计相同最小活跃数下标
                totalWeight += weight; // 累计总权重
                // 判断所有权重是否一样
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        if (leastCount == 1) {
            // 如果只有一个最小则直接返回
            return invokers.get(leastIndexes[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offsetWeight = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(leastIndexes[random.nextInt(leastCount)]);
    }
    

    算法说明

    最小活跃数算法实现:
    假定有3台dubbo provider:
    10.0.0.1:20884, weight=2,active=2
    10.0.0.1:20886, weight=3,active=4
    10.0.0.1:20888, weight=4,active=3
    active=2最小,且只有一个2,所以选择10.0.0.1:20884

    假定有3台dubbo provider:
    10.0.0.1:20884, weight=2,active=2
    10.0.0.1:20886, weight=3,active=2
    10.0.0.1:20888, weight=4,active=3
    active=2最小,且有2个,所以从[10.0.0.1:20884,10.0.0.1:20886 ]中选择;
    接下来的算法与随机算法类似:
    假设offset=1(即random.nextInt(5)=1)
    1-2=-1<0?是,所以选中 10.0.0.1:20884, weight=2
    假设offset=4(即random.nextInt(5)=4)
    4-2=2<0?否,这时候offset=2, 2-3<0?是,所以选中 10.0.0.1:20886, weight=3

    2.4 一致性hash算法

    实现源码请参考ConsistentHashLoadBalanceRobinLoadBalance.doSelect(),具体步骤如下:

    1. 定义全局一致性hash选择器的ConcurrentMap<String, ConsistentHashSelector<?>> selectors,key为方法名称,例如com.alibaba.dubbo.demo.TestService.getRandomNumber;
    2. 如果一致性hash选择器不存在或者与以前保存的一致性hash选择器不一样(即dubbo服务provider有变化,通过System.identityHashCode(invokers)计算一个identityHashCode值) 则需要重新构造一个一致性hash选择器;
    3. 构造一个一致性hash选择器ConsistentHashSelector的源码如下,通过参数i和h打散Invoker在TreeMap上的位置,replicaNumber默认值为160,所以最终virtualInvokers这个TreeMap的size为invokers.size()*replicaNumber
    for (Invoker<T> invoker : invokers) {
        for (int i = 0; i < replicaNumber / 4; i++) {
            byte[] digest = md5(invoker.getUrl().toFullString() + i);
            for (int h = 0; h < 4; h++) {
                long m = hash(digest, h);
                virtualInvokers.put(m, invoker);
            }
        }
    }
    
    1. 选择Invoker的步骤:

    4.1. 根据Invocation中的参数invocation.getArguments()转成key;
    4.2 算出这个key的md5值;
    4.3 根据md5值的hash值从TreeMap中选择一个Invoker;

    NOTE: 选择Invoker的hash值根据Invocation中的参数得到,那么如果调用的方法没有任何参数,例如getRandomNumber(),那么在可用Invoker集合不变的前提下,任何对该方法的调用都一直返回相同的Invoker;另外,通过对一致性hash算法的分析可知,weight权重设置对其不起作用,只对其他三种负载均衡算法其作用;

    相关文章

      网友评论

          本文标题:14.dubbo源码-负载均衡

          本文链接:https://www.haomeiwen.com/subject/vsotmxtx.html