美文网首页
[Soul 源码之旅] 1.11 Soul的负载均衡

[Soul 源码之旅] 1.11 Soul的负载均衡

作者: AndyWei123 | 来源:发表于2021-02-01 23:29 被阅读0次

    我们上一节介绍 Soul 的 SPI 机制是就解释过,Soul 通过 SPI 机制进行加载负载均衡策略,这一节我们分别来介绍一下三种负载均衡机制。

    1.11.1 准备

    为了测试负载均衡机制,我们需要启动多个客户端进行测试,使用 idea 的同学可以通过配置 run 设置如下


    parallel run

    同时我们还需要修改如下配置端口:


    port
    Soul 中负载均衡的策略都实现了 LoadBlance 接口,具体如下:
    image.png

    我们使用 Divide 插件进行讲述:

    1.11.2 AbstractLoadBalance

    在 AbstractLoadBalance 主要的算法就是 select ,这里假如 upstream 为空,则返回 null ,假如只有一个就直接返回,假如有多个就调用 doSelect ,这里是不是和插件的 execute 和 doExecute 很相似。

        @Override
        public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {
            if (CollectionUtils.isEmpty(upstreamList)) {
                return null;
            }
            if (upstreamList.size() == 1) {
                return upstreamList.get(0);
            }
            return doSelect(upstreamList, ip);
        }
    

    这里还有一个很重要的方法就是获取权重:这里并不是简单的返回权重,而是根据启动时间,预热时间,和设置的权重计算出真正的权重 主要公式是 :((float) uptime / ((float) warmup / (float) weight)); 它主要是放置刚启动的服务负载过大,需要一个预热过程,而且这个分支是只有启动时间小于预热时间才会进入,但是由于 soul 目前上传的 timestamp 一直是 0,这个过程不起作用,所以真正没有使用到。(这里实现和 dubbo 很像)

        protected int getWeight(final DivideUpstream upstream) {
            if (!upstream.isStatus()) {
                return 0;
            }
            // 获取权重
            int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());
            return weight;
        }
    
        private int getWeight(final long timestamp, final int warmup, final int weight) {
            if (weight > 0 && timestamp > 0) {
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                if (uptime > 0 && uptime < warmup) {
                    return calculateWarmupWeight(uptime, warmup, weight);
                }
            }
            return weight;
        }
    
        private int getWarmup(final int warmup, final int defaultWarmup) {
            if (warmup > 0) {
                return warmup;
            }
            return defaultWarmup;
        }
    
        private int calculateWarmupWeight(final int uptime, final int warmup, final int weight) {
            // 运行时间 / 预热时间  防止刚启动 负载就大量增加
            //换算成数据公式: weight(设置的权重) * uptime(运行时间)/warmup(预热时间)
            //小计:预热时间越长,重新计算的权重就越接近设置的权重
            int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
            return ww < 1 ? 1 : (ww > weight ? weight : ww);
        }
    

    1.11.3 RandomLoadBalance

    我们先看它的 doSelect 方法,这里主要的逻辑是假如所有的 upstream 的权重都是一样的,那么就可以直接通过随机数选择其中一个,假如不是则根据权重进行随机选择,我们侧重看一下 random(totalWeight, upstreamList);

        @Override
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            int totalWeight = calculateTotalWeight(upstreamList);
            boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
            if (totalWeight > 0 && !sameWeight) {
                return random(totalWeight, upstreamList);
            }
            // If the weights are the same or the weights are 0 then random
            return random(upstreamList);
        }
    

    这里引入了一个 offset 的概念,即落入某个区间,offset 每减一次权重,就是减去 totalweight 的权重中上一个区间的长度,那么假如 offset 小于0 这代表随机数落在这个区间上,不是则看下一个区间。

        private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
            // If the weights are not the same and the weights are greater than 0, then random by the total number of weights
            int offset = RANDOM.nextInt(totalWeight);
            // Determine which segment the random value falls on
            for (DivideUpstream divideUpstream : upstreamList) {
                offset -= getWeight(divideUpstream);
                if (offset < 0) {
                    return divideUpstream;
                }
            }
            return upstreamList.get(0);
        }
    

    1.11.4 HashLoadBalance

    我们还是从 doSelect 方法出发,soul 会为所有的 upstream 根据下面的公式 "SOUL-" + address.getUpstreamUrl() + "-HASH-" + i 得到一个String 然后生产对应的 hash 值,这里每个 upstream 会生成 5个,然后放置在 ConcurrentSkipListMap 中,然后根据 ip 生成对应的 hash ,然后从 ConcurrentSkipListMap 找到第一个大于 这个ip 对应的 hash 值,假如没有则返回第一个,这个功能可以在多集群中实现会话保持的能力,因为每个ip 它对应的服务器每次都是确定的。

    
        @Override
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
            for (DivideUpstream address : upstreamList) {
                for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                    long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
                    treeMap.put(addressHash, address);
                }
            }
            long hash = hash(String.valueOf(ip));
            SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
            if (!lastRing.isEmpty()) {
                return lastRing.get(lastRing.firstKey());
            }
            return treeMap.firstEntry().getValue();
        }
    

    RoundRobinLoadBalance

    RoundRobinLoadBalance 是这三种算法中实现最复杂的,我们先来看一下它的 doSelect 方法:

        @Override
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            String key = upstreamList.get(0).getUpstreamUrl();
            ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
            if (map == null) {
                methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
                map = methodWeightMap.get(key);
            }
            int totalWeight = 0;
            long maxCurrent = Long.MIN_VALUE;
            long now = System.currentTimeMillis();
            DivideUpstream selectedInvoker = null;
            WeightedRoundRobin selectedWRR = null;
            for (DivideUpstream upstream : upstreamList) {
                String rKey = upstream.getUpstreamUrl();
                WeightedRoundRobin weightedRoundRobin = map.get(rKey);
                int weight = getWeight(upstream);
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setWeight(weight);
                    map.putIfAbsent(rKey, weightedRoundRobin);
                }
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    weightedRoundRobin.setWeight(weight);
                }
                long cur = weightedRoundRobin.increaseCurrent();
                weightedRoundRobin.setLastUpdate(now);
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = upstream;
                    selectedWRR = weightedRoundRobin;
                }
                totalWeight += weight;
            }
            if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                    newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
            if (selectedInvoker != null) {
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
            // should not happen here
            return upstreamList.get(0);
        }
    

    这里引入了一个数据结构 WeightedRoundRobin ,它主要有三个属性 weight 该upstream 对应的权重,current 当前的权重,lastUpdate 最后更新时间。我们结合实例来解析该原理,如下是两个 upstream weight 为 50% 的轮询过程。这里先使用 for 循环遍历所有的 upstream ,然后 current值设置为 upstream 对应的 WeightedRoundRobin 加上它本身的权重,如下第一次从 0 到 50 ,然后从中选出最大的 current 值。当选中为这才调用的 upstream ,那么它的 current 会被减少 totaolWeight ,即就是途中 50 - 100 为 -50 ,然后下一次轮询又加上当前的权重50 就变成了0, 而另外一个 就是 50 + 50 变成了 100 。


    第一次
    第二次
    第三次

    1.11.5 总结

    如上三种也是我们常用的三种轮询算法,我们从中可以看出 dubbo 的影子,三种轮询算法实现 包括 wramup 机制实现基本一致。

    相关文章

      网友评论

          本文标题:[Soul 源码之旅] 1.11 Soul的负载均衡

          本文链接:https://www.haomeiwen.com/subject/pmzstltx.html