美文网首页
Spring Cloud 客户端负载均衡

Spring Cloud 客户端负载均衡

作者: 蜀山_竹君子 | 来源:发表于2020-12-22 20:29 被阅读0次

    负载均衡简介

    负载均衡是集群技术的一种应用,通过负载技术将工作任务平衡、分摊到多个操作单元运行。负载均衡建立在网络结构之上,是提供系统高可用性、处理能力以及缓解网络压力的重要手段。

    负载均衡分类

    • 服务端负载:硬件负载均衡和软件负载均衡
    • 客户端负载

    服务端负载均衡和客户端负载均衡区别?

    • 基于服务端实现的负责均衡(无论软硬件),都会在负载均衡设备(软硬件)下挂服务端清单并通过心跳检测来剔除故障节点以保证清单中都是可以正常访问的服务端节点。
      无论是软件负载还是硬件负载都能够基于类似下述架构方式进行构建。
    服务端负载
    • 客户端负载均衡中,客户端节点都维护者自己所需要访问的服务清单,而这些清单来自服务注册中心。

    负载均衡部署方式

    • 路由模式
    • 桥接模式
    • 服务器直连模式

    负载均衡算法

    • 静态:以固定概率分配任务,不考虑服务器状态信息,比如轮询、加权轮询算法等。
    • 动态:以服务器实时负载状态信息来决定任务分配,如最小连接法、加权最小连接法。
    • 轮询算法
      将用户请求轮流分配给服务器。轮询算法实现简单,具有绝对均衡优点。缺点是无法保证任务分配的合理性,无法根据服务器承受能力以及服务器状态来分配任务。
    • 加权轮询算法
      根据服务器处理能力,给每个服务器赋予不同权值使其能够接受不同权值数的服务请求。
    • 随机法
      随机选择一台服务器执行任务,随机法是没有状态的,不需要维护上次的选择状态和均衡因子。随机法保证了请求任务的分散性达到均衡的目的。随机法随着任务数的增大,效果趋向轮询法,也具有轮询的缺点。
    • 最小连接数法
      将请求分配给当前具有最小连接数据的服务器,是动态负载算法。一个节点收到任务请求后,节点权值会+1,节点不可用时,节点权值设置为0不再分配任务。最小连接法适用于服务节点处理性能相似,当服务节点性能差异较大无法达到预期结果。

    负载均衡技术

    • 基于DNS的负责均衡技术
    • 反向代理:
    • 基于NAT(NetWork Adress Transaction)
    • Ribbion 客户端负载均衡实现
      SpringCloud Ribbion 客户端负载均衡 是Spring Cloud Netflix 子项目核心项目,是基于Http和TCP的客户端负载工具,主要给服务端调用及API网关转发提供负载均衡功能。Spring Cloud Ribbon是一种工具栏框架无需独立部署运行,而是集成于其他项目配套使用。
      通过Spring Cloud Ribbon的封装,在Spring Cloud 微服务架构中实现客户端负载均衡非常简单:

    客户端负载均衡

    Spring Cloud Ribbon 客户端负载 快速集成

    • step1 引入ribbon依赖
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
    </dependency>
    

    另外Spring Cloud Alibaba 已经内置Ribbon,如果使用Nacos做服务治理,就无需再单独引入netflix-ribbon依赖。

    • step2 创建RestTemplate Bean并开启负载均衡功能
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
       return new RestTemplate();
    }
    
    • step3 使用RestTemplate来实现服务端接口调用
    restTemplate.getForEntity("http://mall-provider/hello", String.class).getBody();
    

    RestTemplate详解

    RestTemplate是HTTP客户端,通过RestTemplate使我们可以简单方便的实现http接口调用,支持GET、PUT、POST、DELETE。

    GET方法

    RestTemplate对GET方法提供了两种函数调用方式:
    第一种是getForEntity函数,返回ResponseEntity<T>对象。该对象是Spring对HTTP请求响应的封装,存储了HTTP的几个重要元素,比如HTTP请求的状态码枚举对象HTTPStatus(200,300,400,500等)、响应头、响应体等。
    getForEntity函数提供了三种不同重载实现:

    <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables);
    <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables);
    <T> ResponseEntity<T> getForEntity(URI var1, Class<T> responseType);
    

    代码举例如下:

    return restTemplate.getForEntity("http://mall-provider/hello", String.class).getBody();
    ////////////////
    @GetMapping("/getEntityByUsername")
    public CommonResult getEntityByUsername(@RequestParam String username) {
        ResponseEntity<CommonResult> entity = restTemplate.getForEntity(userServiceUrl + "/user/getByUsername?username={1}", CommonResult.class, username);
        if (entity.getStatusCode().is2xxSuccessful()) {
            return entity.getBody();
        } else {
            return new CommonResult("操作失败", 500);
        }
    }
    

    第二种是getForObject,可以理解为对getForEntity的进一步封装,Spring通过HttpMessageConverterExtractor对HTTP响应体内容进行对象转换,实现直接返回包装好的对象内容。
    getForObject同样提供了三种重载实现:

     <T> T getForObject(String url, Class<T> responseType, Object... uriVariables);
    
    <T> T getForObject(String url, Class<T> responseType, Map<String, ?> uriVariables);
    
    <T> T getForObject(URI url, Class<T> responseType);
    

    代码实现举例:

     @RequestMapping(value = "/consumer-index", method = RequestMethod.GET)
    public String index() {
        //return restTemplate.getForEntity("http://mall-provider/hello", String.class).getBody();
        return restTemplate.getForObject("http://mall-provider/hello", String.class);
    }
    

    POST方法

    RestTemplate对POST提供了三种函数调用方式:postforEntity、postForObject、postForLocation。

     <T> T postForObject(String url, @Nullable Object request, Class<T> responseType, Object... uriVariables);
    
    <T> T postForObject(String url, @Nullable Object request, Class<T> responseType, Map<String, ?> uriVariables);
    
    <T> T postForObject(URI url, @Nullable Object request, Class<T> responseType);
    
    <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request, Class<T> responseType, Object... uriVariables);
    
    <T> ResponseEntity<T> postForEntity(String url, @Nullable Object request, Class<T> responseType, Map<String, ?> uriVariables);
    
    <T> ResponseEntity<T> postForEntity(URI url, @Nullable Object request, Class<T> responseType);
    

    //实现以post方式提交资源,并返回新资源的URI

    public URI postForLocation(URI url, @Nullable Object request);
    
    public URI postForLocation(String url, @Nullable Object request, Map<String, ?> uriVariables);
    
    public URI postForLocation(String url, @Nullable Object request, Object... uriVariables);
    

    代码实现举例:

     //postForEntity
    @PostMapping("/create")
    public CommonResult create(@RequestBody User user) {
        return restTemplate.postForEntity(userServiceUrl + "/user/create", user, CommonResult.class);
    }
    //postForObject
    @PostMapping("/create")
    public CommonResult create(@RequestBody User user) {
        return restTemplate.postForObject(userServiceUrl + "/user/create", user, CommonResult.class);
    }
    

    PUT方法

     void put(String url, @Nullable Object request, Object... uriVariables);
    
    void put(String url, @Nullable Object request, Map<String, ?> uriVariables);
    
    void put(URI url, @Nullable Object request);
    
    @PutMapping("/update")
    public CommonResult update(@RequestBody User user) {
        restTemplate.put(userServiceUrl + "/user/update", user);
        return new CommonResult("操作成功",200);
    }
    

    DELETE方法

     void delete(String url, Object... uriVariables);
    
    void delete(String url, Map<String, ?> uriVariables);
    
    void delete(URI url);
    
    @DeleteMapping("/delete/{id}")
    public CommonResult delete(@PathVariable Long id) {
       restTemplate.delete(userServiceUrl + "/user/delete/{1}", null, id);
       return new CommonResult("操作成功",200);
    }
    

    源码分析

    Spring Cloud Ribbon 源码分析

    从上述Spring Cloud客户端负载均衡实现示例来看,要使用客户端负载均衡除了引入Ribbon依赖,最重要的就是使用被@LoadBalanced注解修饰的RestTemplate。RestTemplate是Spring提供的HTTP封装实现,@LoadBalanced注解是Spring Cloud提供接口,通过该注解标记RestTemplate以使用负载均衡客户端(LoadBalancerClient)来配置RestTemplate。

     public interface ServiceInstanceChooser {
        //根据服务名从负载均衡中获取对应服务实例
        ServiceInstance choose(String serviceId);
    }
    
    public interface LoadBalancerClient extends ServiceInstanceChooser {
        //根据服务名从负载均衡中获取对应服务实例来执行请求
        <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
        //根据传入的服务实例执行请求内容
        <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
        //构建一个合适的host:post形式的uri
        URI reconstructURI(ServiceInstance instance, URI original);
    }
    

    继续分析Spring Cloud Ribbon源码,可以发现Spring Cloud Ribbon并未提供实际的负载均衡实现,而是通过自动化装配注解类LoadBalancerAutoConfiguration,为RestTemplate增加LoadBalancerInterceptor拦截器,当被@LoadBalanced修饰的RestTemplate对外发起Http请求时,会被LoadBalancerInterceptor拦截。

    Spring Cloud Ribbon 负载均衡自动化配置类关系图
    负载均衡自动化配置实现LoadBalancerAutoConfiguration
    @Configuration(
      proxyBeanMethods = false
    )
    //Spring Cloud Ribbon实现负载均衡自动化配置必要条件:
    //1. RestTemplate类必须在当前工程环境
    @ConditionalOnClass({RestTemplate.class})
    //2. 在Spring Bean 容器中必须有LoadBalancerClient的实现Bean
    @ConditionalOnBean({LoadBalancerClient.class})
    @EnableConfigurationProperties({LoadBalancerRetryProperties.class})
    public class LoadBalancerAutoConfiguration {
    
    //维护一个@LoadBalanced修饰的RestTemplate列表,并进行初始化,通过调用
    //RestTemplateCustomizer,对需要客户端负载均衡的RestTemplate添加LoadBalancerInterceptor 拦截器。
      @LoadBalanced
      @Autowired(
        required = false
      )
      private List<RestTemplate> restTemplates = Collections.emptyList();
      @Autowired(
        required = false
      )
      private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
    
      public LoadBalancerAutoConfiguration() {
      }
    
      @Bean
      public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
       //省略
      }
    
      @Bean
      @ConditionalOnMissingBean
      public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
         //省略
      }
    
      @Configuration(
        proxyBeanMethods = false
      )
      @ConditionalOnClass({RetryTemplate.class})
      public static class RetryInterceptorAutoConfiguration {
        public RetryInterceptorAutoConfiguration() {
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RetryLoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRetryProperties properties, LoadBalancerRequestFactory requestFactory, LoadBalancedRetryFactory loadBalancedRetryFactory) {
           //省略
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final RetryLoadBalancerInterceptor loadBalancerInterceptor) {
           //省略
        }
      }
    
      @Configuration(
        proxyBeanMethods = false
      )
      @ConditionalOnClass({RetryTemplate.class})
      public static class RetryAutoConfiguration {
        public RetryAutoConfiguration() {
        }
    
        @Bean
        @ConditionalOnMissingBean
        public LoadBalancedRetryFactory loadBalancedRetryFactory() {
           //省略
        }
      }
    
      @Configuration(
        proxyBeanMethods = false
      )
      @ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
      static class LoadBalancerInterceptorConfig {
        LoadBalancerInterceptorConfig() {
        }
    
        //创建LoadBalancerInterceptor 拦截器,用于实现对客户端请求进行拦截,以实现客户端负载均衡
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
          return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
    
        //创建 RestTemplateCustomizer Bean,用于给RestTemplate添加LoadBalancerInterceptor 
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
          return (restTemplate) -> {
            List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
            list.add(loadBalancerInterceptor);
            restTemplate.setInterceptors(list);
          };
        }
      }
    }
    

    LoadBalancerInterceptor 客户端负载拦截器实现

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
      private LoadBalancerClient loadBalancer;
      private LoadBalancerRequestFactory requestFactory;
    
      public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
      }
    
      public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
      }
      //拦截Http请求,并交由LoadBalancerClient处理,这里的LoadBalancerClient还是一个抽象的接口
      public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
        URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
      }
    }
    

    RibbonLoadBalancerClient负载均衡实现

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        return this.execute(serviceId, (LoadBalancerRequest)request, (Object)null);
      }
    
      public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
        //1. 根据serviceId获取具体服务实例,此时并未使用LoadBalancerClient接口的choose函数
        //而是使用ILoadBalancer 的chooseServer函数
        ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
        Server server = this.getServer(loadBalancer, hint);
        if (server == null) {
          throw new IllegalStateException("No instances available for " + serviceId);
        } else {
          RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
          return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
        }
      }
    
    public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
        Server server = null;
        if (serviceInstance instanceof RibbonLoadBalancerClient.RibbonServer) {
          server = ((RibbonLoadBalancerClient.RibbonServer)serviceInstance).getServer();
        }
    
        if (server == null) {
          throw new IllegalStateException("No instances available for " + serviceId);
        } else {
          RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
          RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
          try {
            T returnVal = request.apply(serviceInstance);
            statsRecorder.recordStats(returnVal);
            return returnVal;
          } catch (IOException var8) {
            statsRecorder.recordStats(var8);
            throw var8;
          } catch (Exception var9) {
            statsRecorder.recordStats(var9);
            ReflectionUtils.rethrowRuntimeException(var9);
            return null;
          }
        }
      }
    
    

    通过上述源码分析,可以知道Spring Cloud Ribbon 框架提供了Ribbon自动装配功能自动化配置、抽象化的通用负载均衡器LoadBalancerClient接口以及针对Ribbon实现的负载均衡器RibbonLoadBalancerClient,但是实际处理负载均衡的是ILoadBalancer接口

    负载均衡器

    ILoadBalancer 接口清单

    public interface ILoadBalancer {
            //向负载均衡器维护的实例列表中增加实例
        public void addServers(List<Server> newServers);
            //通过某种策略从负载均衡器中挑选出一个具体的服务实例
        public Server chooseServer(Object key);
            //标记和通知负载均衡器该服务实例停止服务,否则负载均衡器下次获取服务实例清单时会认为该服务是正常
        public void markServerDown(Server server);
        @Deprecated
        public List<Server> getServerList(boolean availableOnly);
            //获取当前服务正常的实例
            public List<Server> getReachableServers();
            //获取所有已知的服务实例
        public List<Server> getAllServers();
    }
    

    ILoadBalancer 定义了客户端负载均衡需要的一系列操作抽象。


    ILoadBalancer实现类关系图
    • BaseLoadBalancer:实现了基础的负载均衡,提供负载均衡策略定义、服务状态检查、存储服务实例、定义了Rule对象并初始了RoundRobinRule简单的线性负载策略等。
    • DynamicServerListLoadBalancer:在BaseLoadBalancer进行扩展,实现了动态更新服务实例以及按区分组实例的功能,但是其负载策略仍是使用最基本的线性轮询。
    • ZoneAwareLoadBalancer:在线性轮询基础上进行扩展,引入区的概念,一定程度避免了多区域部署场景,周期性的出现跨区域访问,从而导致多区域部署出现性能问题。
      源码清单
    //DynamicServerListLoadBalancer的关键代码
        //已经提出Zone的概念,将服务实例按区域分组
        @Override
        public void setServersList(List lsrv) {
            super.setServersList(lsrv);
            List<T> serverList = (List<T>) lsrv;
            Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
            for (Server server : serverList) {
                // make sure ServerStats is created to avoid creating them on hot
                // path
                getLoadBalancerStats().getSingleServerStat(server);
                String zone = server.getZone();
                if (zone != null) {
                    zone = zone.toLowerCase();
                    List<Server> servers = serversInZones.get(zone);
                    if (servers == null) {
                        servers = new ArrayList<Server>();
                        //按区域Zone分组实例列表
                        serversInZones.put(zone, servers);
                    }
                    servers.add(server);
                }
            }
            setServerListForZones(serversInZones);
        }
    
        //提供按区域更新、统计实例功能
        protected void setServerListForZones(
                Map<String, List<Server>> zoneServersMap) {
            LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
            getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
        }
        public void updateZoneServerMapping(Map<String, List<Server>> map) {
            upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>(map);
            // make sure ZoneStats object exist for available zones for monitoring purpose
            for (String zone: map.keySet()) {
                getZoneStats(zone);
            }
        }
        private ZoneStats getZoneStats(String zone) {
            zone = zone.toLowerCase();
            ZoneStats zs = zoneStatsMap.get(zone);
            if (zs == null){
                zoneStatsMap.put(zone, new ZoneStats(this.getName(), zone, this));
                zs = zoneStatsMap.get(zone);
            }
            return zs;
        }
    //ZoneAwareLoadBalancer 核心实现
        //在DynamicServerListLoadBalancer基础进行扩展,将服务实例按区域分组,并每个区域存储对应的负载均衡器
        @Override
        protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
            super.setServerListForZones(zoneServersMap);
            if (balancers == null) {
                //存储每个Zone区域对应的负载均衡器。
                balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
            }
            for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
                String zone = entry.getKey().toLowerCase();
                getLoadBalancer(zone).setServersList(entry.getValue());  //设置对应Zone区域的实例清单
            }
            //对Zone区域中的实例清单的检查,看看是否有zone区域下已经没有实例了,是的话
            //就将Balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点
            //时,防止过时的Zone区域统计信息干扰具体实例的选择算法。
            for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
                if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                    existingLBEntry.getValue().setServersList(Collections.emptyList());
                }
            }
        }    
        //实现按分区负载功能   
        @Override
        public Server chooseServer(Object key) {
            //只有当负载均衡器中维护的实例所属的Zone区域的个数大于1的时候才会执行选择策略
            //否则还是使用父类的实现
            if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
                logger.debug("Zone aware logic disabled or there is only one zone");
                return super.chooseServer(key);
            }
            Server server = null;
            try {
                LoadBalancerStats lbStats = getLoadBalancerStats();
                //为当前负载均衡器中的所有Zone区域分别创建快照,保存在zoneSnapshot中,这些快照中的数据用于后续的算法
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (triggeringLoad == null) {
                    triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
                }
     
                if (triggeringBlackoutPercentage == null) {
                    triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
                }
                //获得可用Zone区域的集合,getAvailableZones会通过zoneSnapshot实现可用区域挑选
                Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
                logger.debug("Available zones: {}", availableZones);
                if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                    //随机选择一个Zone区域
                    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                    logger.debug("Zone chosen: {}", zone);
                    if (zone != null) {
                        //获得对应区域的负载均衡器
                        BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                        //选择具体的服务实例
                        //在chooseServer中将会使用IRule接口的choose函数来选择具体服务实例。在这里,IRule接口的实现会实现ZoneAvoidanceRule来挑选具体的服务实例。
                        server = zoneLoadBalancer.chooseServer(key);
                    }
                }
            } catch (Exception e) {
                logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
            }
            if (server != null) {
                return server;
            } else {
                logger.debug("Zone avoidance logic is not invoked.");
                return super.chooseServer(key);
            }
        }
         
        //为每个区域创建负载均衡器,并设置负载均衡规则
        @VisibleForTesting
        BaseLoadBalancer getLoadBalancer(String zone) {
            zone = zone.toLowerCase();
            BaseLoadBalancer loadBalancer = balancers.get(zone);
            if (loadBalancer == null) {
                // 创建规则
                IRule rule = cloneRule(this.getRule());
                loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
                BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
                if (prev != null) {
                    loadBalancer = prev;
                }
            }
            return loadBalancer;        
        }
     
        private IRule cloneRule(IRule toClone) {
            IRule rule;
            if (toClone == null) {
                //没有IRule实例,创建AvailabilityFilteringRule实例
                rule = new AvailabilityFilteringRule();
            } else {
                String ruleClass = toClone.getClass().getName();                
                try {
                    //有就克隆一个
                    rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClass, this.getClientConfig());
                } catch (Exception e) {
                    throw new RuntimeException("Unexpected exception creating rule for ZoneAwareLoadBalancer", e);
                }
            }
            return rule;
        }
        
           
        @Override
        public void setRule(IRule rule) {
            super.setRule(rule);
            if (balancers != null) {
                for (String zone: balancers.keySet()) {
                    balancers.get(zone).setRule(cloneRule(rule));
                }
            }
        }
    }
    ///
    //ZoneAwareLoadBalancer会用到该函数
    public static Set<String> getAvailableZones(
            Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
            double triggeringBlackoutPercentage) {
        if (snapshot.isEmpty()) {
            return null;
        }
        Set<String> availableZones = new HashSet<String>(snapshot.keySet());
        if (availableZones.size() == 1) {
            return availableZones;
        }
        Set<String> worstZones = new HashSet<String>();
        double maxLoadPerServer = 0;
        boolean limitedZoneAvailability = false;
     
        for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
            String zone = zoneEntry.getKey();
            ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
            int instanceCount = zoneSnapshot.getInstanceCount();
            if (instanceCount == 0) {
                availableZones.remove(zone);     //剔除实例个数为0的Zone区域
                limitedZoneAvailability = true;
            } else {
                double loadPerServer = zoneSnapshot.getLoadPerServer();
                if (((double) zoneSnapshot.getCircuitTrippedCount())
                        / instanceCount >= triggeringBlackoutPercentage
                        || loadPerServer < 0) {
                    //实例平均负载小于0的区域剔除
                    //实例故障率(断路器端口次数/实例数)大于等于阈值(默认0,99999)的区域剔除。
                    availableZones.remove(zone);
                    limitedZoneAvailability = true;
                } else {
                    //实例平均负载最差的Zone区域找到,这里最差指的是实例平均负载最高的zone区域
                    if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                        worstZones.add(zone);
                    } else if (loadPerServer > maxLoadPerServer) {
                        maxLoadPerServer = loadPerServer;
                        worstZones.clear();
                        worstZones.add(zone);
                    }
                }
            }
        }
        //如有实例的最大平均负载小于阈值(默认20%)
        //如果没有符合剔除要求的区域
        //符合这两个条件就直接返回所有Zone区域为可用区域
        if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
            // zone override is not needed here
            return availableZones;
        }
        //随机选择一个最差的区域
        String zoneToAvoid = randomChooseZone(snapshot, worstZones);
        if (zoneToAvoid != null) {
            //干掉最差的区域
            availableZones.remove(zoneToAvoid);
        }
        return availableZones;
     
    }
    
    

    通过原因可以看出DynamicServerListLoadBalancer已经提出分区的概念,并且将服务实例按区域分组进行维护,但是DynamicServerListLoadBalancer仍是将不同区域的服务实例按照同等的服务实例对待,提供简单的线性轮询功能,因此对于多区域部署应用,一定会周期性的出现跨区域访问。ZoneAwareLoadBalancerDynamicServerListLoadBalancer基础进行改进,对服务进行分组并且实现了跨区域之间的负载均衡实现,避免出现跨区域访问。

    Ribbon负载均衡策略

    Ribbon通过IRule接口来实现负载均衡策略:


    负载均衡策略实现类关系图

    功能描述

    • com.netflix.loadbalancer.RandomRule 从提供服务的实例中以随机的方式
    • com.netflix.loadbalancer.RoundRobinRule 以线性轮询的方式,就是维护一个计数器,从提供服务的实例中按顺序选取,第一次选第一个,第二次选第二个,以此类推,到最后一个以后再从头来过
    • com.netflix.loadbalancer.RetryRule 在RoundRobinRule的基础上添加重试机制,即在指定的重试时间内,反复使用线性轮询策略来选择可用实例;
    • com.netflix.loadbalancer.WeightedResponseTimeRule 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择;
    • com.netflix.loadbalancer.BestAvailableRule 选择并发较小的实例;
    • com.netflix.loadbalancer.AvailabilityFilteringRule 先过滤掉故障实例,再选择并发较小的实例;
    • com.netflix.loadbalancer.ZoneAwareLoadBalancer 采用双重过滤,同时过滤不是同一区域的实例和故障实例,选择并发较小的实例。

    Ribbon常用配置

    全局配置

     ribbon:
      ConnectTimeout: 1000 #服务请求连接超时时间(毫秒)
      ReadTimeout: 3000 #服务请求处理超时时间(毫秒)
      OkToRetryOnAllOperations: true #对超时请求启用重试机制
      MaxAutoRetriesNextServer: 1 #切换重试实例的最大个数
      MaxAutoRetries: 1 # 切换实例后重试最大次数
      NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #修改负载均衡算法
    

    指定客户端配置

     #ribbon节点挂载服务名称下面,ribbon-service调用user-service时的单独配置
    user-service:
      ribbon:
        ConnectTimeout: 1000 #服务请求连接超时时间(毫秒)
        ReadTimeout: 3000 #服务请求处理超时时间(毫秒)
        OkToRetryOnAllOperations: true #对超时请求启用重试机制
        MaxAutoRetriesNextServer: 1 #切换重试实例的最大个数
        MaxAutoRetries: 1 # 当前实例后重试最大次数
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #修改负载均衡算法
    

    从上述代码分析,基本清楚了Spring Cloud Ribbon客户端负载均衡实现原理,为我们在实际应用过程中选择和自定义合适负载均衡实现提供帮忙。

    Ribbon 客户端负载均衡高级应用

    自定义负载均衡

    除了通过修改配置文件来达到修改负载均衡策略实现外,我们还可以使用自定义负载均衡实现来满足我们实际应用过程需要的负载均衡实现,比如我们需要使用服务元数据作为负载均衡的标记。

    Spring Cloud Ribbon RibbonClientConfiguration 默认自动加载负载均衡配置实现采用@ConditionalOnMissingBean 模式初始Bean为我们自定义实现负载均衡算法提供了可能。

    @Configuration(
      proxyBeanMethods = false
    )
    @EnableConfigurationProperties
    @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
    public class RibbonClientConfiguration {
      public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
      public static final int DEFAULT_READ_TIMEOUT = 1000;
      public static final boolean DEFAULT_GZIP_PAYLOAD = true;
      @RibbonClientName
      private String name = "client";
      @Autowired
      private PropertiesFactory propertiesFactory;
    
      public RibbonClientConfiguration() {
      }
    
    //初始默认负载均衡配置Bean DefaultClientConfigImpl 
      @Bean
      @ConditionalOnMissingBean
      public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, 1000);
        config.set(CommonClientConfigKey.ReadTimeout, 1000);
        config.set(CommonClientConfigKey.GZipPayload, true);
        return config;
      }
    
    //初始默认负载均衡策略Bean ZoneAvoidanceRule
      @Bean
      @ConditionalOnMissingBean
      public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, this.name)) {
          return (IRule)this.propertiesFactory.get(IRule.class, config, this.name);
        } else {
          ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
          rule.initWithNiwsConfig(config);
          return rule;
        }
      }
    
    //初始默认服务健康检查IPing对象
      @Bean
      @ConditionalOnMissingBean
      public IPing ribbonPing(IClientConfig config) {
        return (IPing)(this.propertiesFactory.isSet(IPing.class, this.name) ? (IPing)this.propertiesFactory.get(IPing.class, config, this.name) : new DummyPing());
      }
    
    //初始默认服务实例清单加载方式
      @Bean
      @ConditionalOnMissingBean
      public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, this.name)) {
          return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.name);
        } else {
          ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
          serverList.initWithNiwsConfig(config);
          return serverList;
        }
      }
    //初始默认实例更新对象
      @Bean
      @ConditionalOnMissingBean
      public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
      }
    
    //初始默认负载均衡器实现 ZoneAwareLoadBalancer
      @Bean
      @ConditionalOnMissingBean
      public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
      }
    
    //初始默认服务实例过滤算法 ZonePreferenceServerListFilter
      @Bean
      @ConditionalOnMissingBean
      public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerListFilter.class, this.name)) {
          return (ServerListFilter)this.propertiesFactory.get(ServerListFilter.class, config, this.name);
        } else {
          ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
          filter.initWithNiwsConfig(config);
          return filter;
        }
      }
    
      @Bean
      @ConditionalOnMissingBean
      public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
      }
    
    //省略
    
    

    自定义负载均衡举例

    @Configuration
    @AutoConfigureBefore(RibbonClientConfiguration.class)
    public class MyLoadBalanceConfiguration {
      //省略一大堆
    
      @Bean
      public IRule ribbonRule(IClientConfig config) {
          //自定义负载均衡算法
          return discoveryEnabledRule;
        }
      }
    }
    
    

    通过配置应用自定义负载均衡实现,并事先在默认RibbonClientConfiguration Ribbon负载均衡初始化之前完成配置的加载,即可实现自定义负载均衡策略。

    与Nacos结合

    前面已经提到,Spring Cloud Alibaba 默认集成了Ribbon,当Spring Cloud 应用与Spring Cloud Alibaba Nacos Discovery 集成时,会自动触发Nacos中实现的对Ribbon的自动化配置(通过开关ribbon.nacos.enabled控制是否自动触发,默认true)。ServerList的维护机制将被NacosServerList覆盖,服务实例清单列表交给Nacos的服务治理机制维护。

    Nacos默认仍使用的是Spring Cloud Ribbon默认的负载均衡实现,只是扩展了服务实例清单维护机制。

    相关文章

      网友评论

          本文标题:Spring Cloud 客户端负载均衡

          本文链接:https://www.haomeiwen.com/subject/kwofpktx.html