美文网首页Java 程序员程序员Java
从源码角度搞懂 Ribbon 的负载策略

从源码角度搞懂 Ribbon 的负载策略

作者: 程序花生 | 来源:发表于2021-08-10 15:13 被阅读0次

    前言

    Ribbon 是 Netflix公司的一个开源项目,现已被收录到 SpringCloud ,是一个基于 HTTP 和 TCP 的客户端负载均衡器,当我们将 Ribbon 与 Eureka一起使用时,Ribbon会从Eureka注册中心去获取服务端列表,通过轮询方式达到负载均衡的作用,客户端负载均衡中用心跳机制去维护服务端清单的有效性,这个过程需要配合服务注册中心一起完成。

    什么是负载均衡?

    负载均衡是我们处理高并发、缓解网络压力和进行服务端扩容的重要手段之一,但是一般情况下我们所说的负载均衡通常都是指服务端负载均衡,负载均衡又分为两种,还有一种是客户端负载均衡。

    Ribbon 与 Nginx 的区别?

    Ribbon 是客户端负载均衡器,而 Nginx 是服务端负载均衡器。

    客户端负载指的是 client 有要调用的服务实例清单,比如 eureka/nacos 存储各服务实例信息,而对于其中集成的 Ribbon 来说,从已知的服务列表通过某种策略选取一个实例负载,这就是客户端负载均衡,即在客户端进行负载均衡算法分配。

    服务端负载指的是 client 不知道调用哪个 server 实例,发送请求后,通过服务端的负载均衡算法,在多个服务端之间选择一个进行访问,即在服务端进行负载均衡算法分配。

    客户端与服务端负载均衡的区别实际上是服务清单所存储的位置,在客户端负载均衡中,所有client有一份要访问的服务端清单地址。

    ribbon负载策略

    七种负载均衡策略类图如下:

    负载均衡接口
    com.netflix.loadbalancer.IRule,由抽象类 AbstractLoadBalancerRule实现 IRule 接口,各个策略都是抽象类的具体实现。

    轮询策略 -RoundRobinRule

    下面看看 RoundRobinRule 类的源码是如何实现的。

    public class RoundRobinRule extends AbstractLoadBalancerRule {
    
        private AtomicInteger nextServerCyclicCounter;
    
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                log.warn("no load balancer");
                return null;
            }
    
            Server server = null;
            // 用于计算负载均衡器尝试获取可用服务器的次数
            int count = 0;
            // 共尝试10次,超过则负载失败
            while (server == null && count++ < 10) {
                // 获取所有可达服务器
                List<Server> reachableServers = lb.getReachableServers();
                // 获取所有服务器
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
    
                if ((upCount == 0) || (serverCount == 0)) {
                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }
                // 自旋锁计算出下一个负载的服务器
                int nextServerIndex = incrementAndGetModulo(serverCount);
                // 取出下一个负载的服务器
                server = allServers.get(nextServerIndex);
                // 如没有此服务器,当前线程让出CPU,置为就绪状态
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive() && (server.isReadyToServe())) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
            // 获取负载服务器会尝试10次,超过10次警告
            if (count >= 10) {
                log.warn("No available alive servers after 10 tries from load balancer: "
                        + lb);
            }
            return server;
        }
    
        // 取模运算并使用CAS机制更新下一个负载的服务器
        private int incrementAndGetModulo(int modulo) {
            for (;;) {
                // 获取原子属性值
                int current = nextServerCyclicCounter.get();
                // 取模运算
                int next = (current + 1) % modulo;
                // CAS机制更新标识服务器循环计数器
                if (nextServerCyclicCounter.compareAndSet(current, next))
                    return next;
            }
        }
    }
    

    轮询通过模运算计算出负载机器的索引,根据索引从存放所有服务器的 list中取出作为负载服务器。其中,使用原子类 AtomicInteger + CAS 机制来记录下一个负载的服务器标识,保证了线程安全。

    随机策略 - RandomRule

    随机策略是指随机选择服务器实例进行负载,使用 ThreadLocalRandom 方式获取随机数,保证线程安全。

    public class RandomRule extends AbstractLoadBalancerRule {
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
    
            while (server == null) {
                if (Thread.interrupted()) {
                    return null;
                }
                // 获取可达服务器和所有服务器
                List<Server> upList = lb.getReachableServers();
                List<Server> allList = lb.getAllServers();
    
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }
                // 取随机数
                int index = chooseRandomInt(serverCount);
                server = upList.get(index);
    
                if (server == null) {
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive()) {
                    return (server);
                }
    
                server = null;
                Thread.yield();
            }
    
            return server;
    
        }
    
        // 取随机数
        protected int chooseRandomInt(int serverCount) {
            return ThreadLocalRandom.current().nextInt(serverCount);
        }
    }
    

    重试策略 - RetryRule

    先按照轮询负载策略获取服务实例,如果获取失败则在指定时间内(默认500ms)进行重试,循环调用轮询策略获取实例。

    使用 InterruptTask 开启了一个 Timer 守护线程,用来延迟执行指定的任务,在重试时间范围内循环调用轮询策略获取服务器,如超过指定重试时间后仍未获取到服务器信息,则返回 null

    public class RetryRule extends AbstractLoadBalancerRule {
    
        public Server choose(ILoadBalancer lb, Object key) {
           long requestTime = System.currentTimeMillis();
           long deadline = requestTime + maxRetryMillis;
    
           Server answer = null;
    
           // 调用轮询策略
           answer = subRule.choose(key);
    
           // 如果轮询策略没获取到服务器 || 服务器未激活 && 在指定的最大重试时间内
           if (((answer == null) || (!answer.isAlive()))
                 && (System.currentTimeMillis() < deadline)) {
    
              // 开启守护线程,监视剩余指定的重试时间
              InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());
    
              // 在指定的重试时间范围内,当前线程如没中断,循环调用轮询策略
              while (!Thread.interrupted()) {
                 answer = subRule.choose(key);
    
                 if (((answer == null) || (!answer.isAlive()))
                       && (System.currentTimeMillis() < deadline)) {
                    /* pause and retry hoping it's transient */
                    Thread.yield();
                 } else {
                    break;
                 }
              }
    
              task.cancel();
           }
    
           if ((answer == null) || (!answer.isAlive())) {
              return null;
           } else {
              return answer;
           }
        }
    }
    

    加权响应时间 - WeightedResponseTimeRule

    WeightedResponseTimeRule 类继承了轮询策略类 RandomRule

    初始化时,启动一个定时器,每隔 30s 根据服务的响应时间分配一次权重,响应时间越长,权重越低,被选择到的概率也越低。响应时间越短,权重越高,实例被选中概率越高。得到权重后,生成随机权重,命中权重比随机权重大的第一个服务实例。

    public class WeightedResponseTimeRule extends RoundRobinRule {
    
        // 每隔 30s 统计各服务权重
        public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
    
        // 记录累计权重
        private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
    
        // 初始化
        void initialize(ILoadBalancer lb) {        
            if (serverWeightTimer != null) {
                serverWeightTimer.cancel();
            }
            serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                    + name, true);
            // 统计各服务权重
            serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                    serverWeightTaskTimerInterval);
            // do a initial run
            ServerWeight sw = new ServerWeight();
            sw.maintainWeights();
    
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    logger
                            .info("Stopping NFLoadBalancer-serverWeightTimer-"
                                    + name);
                    serverWeightTimer.cancel();
                }
            }));
        }
    
        @Override
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
    
            while (server == null) {
                List<Double> currentWeights = accumulatedWeights;
                // 判断线程是否中断
                if (Thread.interrupted()) {
                    return null;
                }
                // 获取服务器列表
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }
                int serverIndex = 0;
    
                // currentWeights.size() - 1 是所有权重的总和
                double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
    
                // 未命中任何服务器就调用轮询策略获取
                if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                    server =  super.choose(getLoadBalancer(), key);
                    if(server == null) {
                        return server;
                    }
                } else {
                    // 从 0 到 所有权重总和之间获取随机数作为随机权重
                    double randomWeight = random.nextDouble() * maxTotalWeight;
                    int n = 0;
                    // 命中权重比随机权重大的第一个服务实例
                    for (Double d : currentWeights) {
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        } else {
                            n++;
                        }
                    }
    
                    server = allList.get(serverIndex);
                }
    
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive()) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
            return server;
        }
    }
    
    // 内部类
    class ServerWeight {
    
        public void maintainWeights() {
            ILoadBalancer lb = getLoadBalancer();
            if (lb == null) {
                return;
            }
    
            if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                return; 
            }
    
            try {
                logger.info("Weight adjusting job started");
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {
                    // no statistics, nothing to do
                    return;
                }
                double totalResponseTime = 0;
                // 计算出所有服务实例累计的平均响应时间
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                // 记录累计权重
                Double weightSoFar = 0.0;
    
                // 存放所有服务的权重
                List<Double> finalWeights = new ArrayList<Double>();
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    // 每个服务权重 = 所有服务的平均响应时间总和 - 当前服务的平均响应时间
                    // 所以服务的响应时间越大,权重越小,被选中的可能性越小
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);
            } catch (Exception e) {
                logger.error("Error calculating server weights", e);
            } finally {
                serverWeightAssignmentInProgress.set(false);
            }
    
        }
    }
    

    例如:现在有三个服务实例,平均响应时间分别为:

    • A:100ms
    • B:200ms
    • C:300ms

    则权重分别是:

    • A:600-100 = 500
    • B:500+600-200 = 900
    • C:900+600-300 = 1200

    生成的随机数若在 0-500 之间,则命中服务 A,如在 500 - 900 之间,则命中服务 B,如在 900 - 1200,则命中服务 C,如果没有命中任何服务实例,则取轮询策略的结果。

    最佳可用策略 - BestAvailableRule

    如未指定负载均衡器,采用轮询策略选取一个服务实例;

    如指定了负载均衡器,逐个考察服务实例,过滤掉断路器跳闸状态的实例,从未过滤掉的实例中选择一个并发量最小的实例。如果未命中,则轮询策略选取一个服务实例。

    public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
    
        @Override
        public Server choose(Object key) {
            // 未指定负载均衡器,调用轮询策略
            if (loadBalancerStats == null) {
                return super.choose(key);
            }
            // 获取所有服务器列表
            List<Server> serverList = getLoadBalancer().getAllServers();
            // 最小并发连接数
            int minimalConcurrentConnections = Integer.MAX_VALUE;
            long currentTime = System.currentTimeMillis();
            Server chosen = null;
            // 遍历服务器列表
            for (Server server: serverList) {
                // 获取服务器统计信息
                ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
                // 如果服务器断路器没有发生断路器跳闸,过滤掉断路器跳闸的实例
                if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                    int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                    // 选择并发量最小的实例
                    if (concurrentConnections < minimalConcurrentConnections) {
                        minimalConcurrentConnections = concurrentConnections;
                        chosen = server;
                    }
                }
            }
            // 如未命中,轮询选取一个实例
            if (chosen == null) {
                return super.choose(key);
            } else {
                return chosen;
            }
        }
    }
    

    可用性过滤策略 - AvailabilityFilteringRule

    该策略继承自抽象策略 PredicateBasedRule 类。

    通过轮询的方式选取一个服务,如果不匹配过滤条件,则继续轮询10次,如果10次还未命中,就轮询选取一个实例。

    过滤条件:断路器故障或者并发请求超过了设置的并发阈值

    public class AvailabilityFilteringRule extends PredicateBasedRule {  
    
        @Override
        public Server choose(Object key) {
            int count = 0;
            // 轮询策略选一个实例
            Server server = roundRobinRule.choose(key);
    
            while (count++ <= 10) {
                // 判断是否符合断言条件
                if (predicate.apply(new PredicateKey(server))) {
                    return server;
                }
                // 不满足断言条件再轮询选择一个实例
                server = roundRobinRule.choose(key);
            }
            // 超过10次还不满足,使用 父类 `PredicateBasedRule`策略
            return super.choose(key);
        }
    }
    

    看看父类 PredicateBasedRule 的负载策略

    public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    
        @Override
        public Server choose(Object key) {
            ILoadBalancer lb = getLoadBalancer();
            // 根据条件过滤后,采用轮询策略选取实例
            Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }       
        }
    }
    

    来看看上述中的断言条件是什么,进入到AvailabilityPredicate类查看断言条件

    public class AvailabilityPredicate extends  AbstractServerPredicate {
    
        @Override
        public boolean apply(@Nullable PredicateKey input) {
            LoadBalancerStats stats = getLBStats();
            if (stats == null) {
                return true;
            }
            return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
        }
    
        private boolean shouldSkipServer(ServerStats stats) {
            // 以下两个条件满足其一就过滤实例
            // 1、断路器开启并且故障
            // 2、实例的并发请求>=阈值
            if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                    || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
                return true;
            }
            return false;
        }
    }
    

    区域回避策略 - ZoneAvoidanceRule

    继承自 PredicateBasedRule

    public class ZoneAvoidanceRule extends PredicateBasedRule {
    
        private static final Random random = new Random();
    
        private CompositePredicate compositePredicate;
    
        public ZoneAvoidanceRule() {
            super();
            // 两个过滤条件
            ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
            AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
            compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
        }
    }
    

    两个断言条件

    public class ZoneAvoidancePredicate extends  AbstractServerPredicate {
        @Override
        public boolean apply(@Nullable PredicateKey input) {
            if (!ENABLED.get()) {
                return true;
            }
            String serverZone = input.getServer().getZone();
            if (serverZone == null) {
                // there is no zone information from the server, we do not want to filter
                // out this server
                return true;
            }
            LoadBalancerStats lbStats = getLBStats();
            if (lbStats == null) {
                // no stats available, do not filter
                return true;
            }
            if (lbStats.getAvailableZones().size() <= 1) {
                // only one zone is available, do not filter
                return true;
            }
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            if (!zoneSnapshot.keySet().contains(serverZone)) {
                // The server zone is unknown to the load balancer, do not filter it out 
                return true;
            }
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            // 获取可用区域
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null) {
                return availableZones.contains(input.getServer().getZone());
            } else {
                return false;
            }
        }   
    }
    

    此过滤条件就是 AvailabilityFilteringRule策略的过滤条件。

    public class AvailabilityPredicate extends  AbstractServerPredicate {
    
        @Override
        public boolean apply(@Nullable PredicateKey input) {
            LoadBalancerStats stats = getLBStats();
            if (stats == null) {
                return true;
            }
            return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
        }
    
        // 以下两个条件满足其一就过滤实例 
        // 1、断路器开启并且故障 
        // 2、实例的并发请求>=阈值
        private boolean shouldSkipServer(ServerStats stats) {        
            if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                    || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
                return true;
            }
            return false;
        }
    }
    

    小结

    本文主要从源码角度分析了ribbon的七个负载均衡策略,如对 MySQL,Spring 等感兴趣请继续关注。

    作者:沸羊羊
    链接:https://juejin.cn/post/6994621608610496542
    来源:掘金

    相关文章

      网友评论

        本文标题:从源码角度搞懂 Ribbon 的负载策略

        本文链接:https://www.haomeiwen.com/subject/bpaxbltx.html