美文网首页
soul网关学习16-插件实现1-Divide2-负载均衡算法

soul网关学习16-插件实现1-Divide2-负载均衡算法

作者: niuxin | 来源:发表于2021-01-30 07:31 被阅读0次

    接着上篇的文章来。

    负载均衡算法

    • 代码出处,使用了一个工具类完成了负载均衡后端节点的选取
    DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
    
    • 看下这里的源码
        public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
            LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
            return loadBalance.select(upstreamList, ip);
        }
    
    • 可以看出这里是通过SPI的机制,根据传入算法名称,运行时动态得到具体的负载均衡实现,由扩展加载器加载到的对应的负载均衡实例
    • 先看看这里的SPI扩展点机制,然后再去看具体的负载均衡算法

    SPI扩展点机制

    • org.dromara.soul.spi.ExtensionLoader关键代码
        public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("extension clazz is null");
            }
            if (!clazz.isInterface()) {
                throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");
            }
            if (!clazz.isAnnotationPresent(SPI.class)) {
                throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");
            }
            ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);
            if (extensionLoader != null) {
                return extensionLoader;
            }
            LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz));
            return (ExtensionLoader<T>) LOADERS.get(clazz);
        }
    
    • 使用SPI机制是soul提供出来的扩展点,如果使用用户想要实现一套自己的负载均衡算法,则只要根据soul的规范,并做相应的配置就可以完成了。
      // TODO 写一个简单例子演示
    • 我们了解下 扩展点加载器ExtensionLoader,该加载器参考了dubbo中的ExtensionLoader,为其缩减版;对比之下,主要减少了依赖注入功能。跟jdk原生的SPI机制相比,实现了按需加载,因jdk会一次性将所有的扩展实现一次性加载的。
    • 关于SPI扩展点机制
    • soulSPI扩展点机制就先介绍到这里,我们看下soul提供的几种负载均衡算法

    负载均衡算法实现

    • 从源码得出,负载均衡算法实现默认🈶️3种randomroundRobin,hash
      负载均衡算法实现

    random-加权随机

    1. 看代码实际上是一个加权的随机算法
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            // 计算所有后端节点的权重值
            int totalWeight = calculateTotalWeight(upstreamList);
            // 判断是否所有的节点为相同权重的情况
            boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
            if (totalWeight > 0 && !sameWeight) {
                // 不是,则要加权随机
                return random(totalWeight, upstreamList);
            }
            // If the weights are the same or the weights are 0 then random
            // 如果各节点权重都相同,则按照总的节点数N,生成一个1-N之间的随机数X,然后选取出该X节点
            return random(upstreamList);
        }
    
    1. 加权随机的实现稍稍复杂点,看下源码
        private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
            // If the weights are not the same and the weights are greater than 0, then random by the total number of weights
            // 先拿到随机数为区间,为1-总权重值N之间
            int offset = RANDOM.nextInt(totalWeight);
            // Determine which segment the random value falls on
            // 遍历所有的节点,每次循环都用区间=区间-当前权重值,然后再看区间是否小于0;
            // 若小于0,则证明刚好位于当前节点的区间,返回当前节点;若没有,则继续循环
            for (DivideUpstream divideUpstream : upstreamList) {
                offset -= getWeight(divideUpstream);
                if (offset < 0) {
                    return divideUpstream;
                }
            }
            // 最后返回第一个兜底
            return upstreamList.get(0);
        }
    

    roundRobin-加权轮询

    1. 加权轮询的算法会有些难懂,我们先看下关键方法doSelect
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            //TODO question 如果这批upstream第一个节点经常变更,可能会导致前面节点生成的WeightedRoundRobin数据无法释放
            String key = upstreamList.get(0).getUpstreamUrl();
            // 取出此批后端服务的权重对象 key -> 后端server,为ip:port   value -> 后端server对应的权重对象
            ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
            if (map == null) {
                methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
                map = methodWeightMap.get(key);
            }
            // 总权重值
            int totalWeight = 0;
            // 当前请求权重的最大值,每次请求都会置为MIN_VALUE
            long maxCurrent = Long.MIN_VALUE;
            // 当前系统时间
            long now = System.currentTimeMillis();
            // 选中节点
            DivideUpstream selectedInvoker = null;
            // 选中节点的权重
            WeightedRoundRobin selectedWRR = null;
            //TODO question 轮询这里为什么没有break,应该找到选中节点后就跳出循环,不需要继续循环下去
            for (DivideUpstream upstream : upstreamList) {
                // rkey -> 后端server,ip:port
                String rKey = upstream.getUpstreamUrl();
                // 后端server对应的权重对象
                WeightedRoundRobin weightedRoundRobin = map.get(rKey);
                // 当前后端节点的权重值
                int weight = getWeight(upstream);
                // 如果当前后端server权重对象不存在,则需生成当前后端server的权重对象,并添加到内存
                if (weightedRoundRobin == null) {
                    weightedRoundRobin = new WeightedRoundRobin();
                    weightedRoundRobin.setWeight(weight);
                    map.putIfAbsent(rKey, weightedRoundRobin);
                }
                // 节点权重值发生了变化,则需要重新设置
                if (weight != weightedRoundRobin.getWeight()) {
                    //weight changed
                    // 权重值发生变化后,将内存中后端server权重更新为最新权重值
                    // 同时其内部current值也将更新为0,回到初始状态
                    weightedRoundRobin.setWeight(weight);
                }
                // 内部序列从0开始增,每次进来都+自己的权重值
                long cur = weightedRoundRobin.increaseCurrent();
                // 重新设置更新时间
                weightedRoundRobin.setLastUpdate(now);
                // 如果节点当前轮询权重大于此次调用的最大权重值,则满足项,可以作为选中节点
                // 这里选中节点后,并不会停止循环,会遍历所有的节点,将最后一个满足条件的节点作为选中节点
                // 这里是为了找到当前权重最大的那个后端server,将其作为选中节点
                if (cur > maxCurrent) {
                    maxCurrent = cur;
                    selectedInvoker = upstream;
                    selectedWRR = weightedRoundRobin;
                }
                // 总权重
                totalWeight += weight;
            }
            // 更新标志锁位未更新  后端节点数不等于原有节点数  采用compareAndSet取锁,获取到锁之后才更新
            if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    // 新的map
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                    // 如果从轮询开始,到结束,中间间隔了1min钟,则需要移除该后端server的权重对象数据
                    // 这里是为了移除掉那些下线的节点,因为只要有被轮询,其LastUpdate=now;而超过1min钟,都没有轮询到的,则就是已下线的节点
                    newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
                    // 将新的后端节点的权重数据替换旧有的
                    methodWeightMap.put(key, newMap);
                } finally {
                    // 最终将锁的标志为设置为false
                    updateLock.set(false);
                }
            }
            // 选中节点的处理
            if (selectedInvoker != null) {
                // 选中节点的内部计数的当前权重current要减去总的权重值,降低其当前权重,也就意味着其下次被选中的优先级被降低
                selectedWRR.sel(totalWeight);
                return selectedInvoker;
            }
            // should not happen here
            return upstreamList.get(0);
        }
    
    1. 因不懂其思想,在debug源码过程中,将关键变量变化的过程用表格记录了下来,方便跟踪逻辑
      RoundRobinComputeProcess
    2. 从上图可以看出:当权重为50:50时,4次调用其最终选取出来的结果为1:1;当权重为10:20时,6次调用其最终选中出来的结果为1:2,前3次调用最终选取出来的结果也为1:2。
    3. //TODO很是神奇,不知晓其中原理
    4. 实现逻辑看起来比较复杂,简单说来就是 对于每个 DivideUpstream 都维护一个权重对象,每次遍历所有权重对象获取到权重最大的那个 DivideUpstream,同时减去被选中的 DivideUpstream 的权重(减去总权重,优先级降为最低)

    5. 参考:【高性能网关soul学习】11. 插件之DevidePlugin

    hash-IP哈希

    • 实际上是基于远程客户端IP的一个hash负载均衡算法
    • doSelect的源码
        public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
            // 并发跳表map key->后端server的hash值 value->后端节点
            final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
            for (DivideUpstream address : upstreamList) {
                // 每个实际后端节点会生成5个虚拟节点,用于更大范围映射,类似于一致性hash
                for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                    // 后端server的hash
                    long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
                    treeMap.put(addressHash, address);
                }
            }
            // 远程客户端ip的hash
            long hash = hash(String.valueOf(ip));
            // 如果 请求地址的Hash值 右边存在节点,返回右边第一个
            // treeMap.tailMap(hash) 会返回所有(key >= hash)的映射集合,同时是有序的。
            SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
            // 如果找到了,则从找到的entry拿出后端节点
            if (!lastRing.isEmpty()) {
                // 这里也就是取 url hash 值右边的第一个节点
                return lastRing.get(lastRing.firstKey());
            }
            // 没找到,则直接去第一个节点兜底
            return treeMap.firstEntry().getValue();
        }
    

    总结

    1. Divide插件这块的内容还挺多的,后面还需要将负载均衡这块的逻辑重点看一看,搞清楚roundRobinhash算法的实现,同时也去看看其他框架中这两种算法的实现
    2. 已分析Divide插件中重点块:后端节点的探活机制负载均衡算法
    3. 后续还有http服务的请求转发websocket服务的支持
    4. 关于负载均衡分析的好文章

    To be continued...

    相关文章

      网友评论

          本文标题:soul网关学习16-插件实现1-Divide2-负载均衡算法

          本文链接:https://www.haomeiwen.com/subject/fvsctltx.html