美文网首页
Ribbon原理解析

Ribbon原理解析

作者: aiwen2017 | 来源:发表于2019-04-22 21:07 被阅读0次

    一. 核心接口

    1. ILoadBalancer
      Ribbon通过ILoadBalancer接口对外提供统一的选择服务器(Server)的功能,此接口会根据不同的负载均衡策略(IRule)选择合适的Server返回给使用者。其核心方法如下:

      public interface ILoadBalancer {
      
          public void addServers(List<Server> newServers);
      
          public Server chooseServer(Object key);
          
          public void markServerDown(Server server);
          
          public List<Server> getReachableServers();
      
          public List<Server> getAllServers();
      }
      

      此接口默认实现类为ZoneAwareLoadBalancer,相关类关系图如下:


      image
    2. IRule
      IRule是负载均衡策略的抽象,ILoadBalancer通过调用IRule的choose()方法返回Server,其核心方法如下:

      public interface IRule{
          
          public Server choose(Object key);
          
          public void setLoadBalancer(ILoadBalancer lb);
          
          public ILoadBalancer getLoadBalancer();    
      }
      

      实现类有:

      • BestAviableRule
        跳过熔断的Server,在剩下的Server中选择并发请求最低的Server
      • ClientConfigEnabledRoundRobinRule、RoundRobinRule
        轮询
      • RandomRule
        随机选择
      • RetryRule
        可重试的策略,可以对其他策略进行重试,默认轮询重试
      • WeightedResponseTimeRule
        根据响应时间加权,响应时间越短权重越大
      • AvailabilityFilteringRule
        剔除因为连续链接、读失败或链接超过最大限制导致熔断的Server,在剩下读Server中进行轮询。

      相关类图如下:


      image
    3. IPing
      IPing用来检测Server是否可用,ILoadBalancer的实现类维护一个Timer每隔10s检测一次Server的可用状态,其核心方法有:

      public interface IPing {
      
          public boolean isAlive(Server server);
      }
      

      其实现类有:


      image
    4. IClientConfig
      IClientConfig主要定义了用于初始化各种客户端和负载均衡器的配置信息,器实现类为DefaultClientConfigImpl。

    二. 负载均衡的逻辑实现

    1. Server的选择

    ILoadBalancer接口的主要实现类为BaseLoadBalancer和ZoneAwareLoadBalancer,ZoneAwareLoadBalancer为BaseLoadBalancer的子类并且其也重写了chooseServer方法,ZoneAwareLoadBalancer从其名称可以看出这个实现类是和Spring Cloud的分区有关的,当分区的数量为1(默认配置)时它直接调用父类BaseLoadBalancer的chooseServer()方法,源码如下:

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            // 调用父类BaseLoadBalancer的chooseServer()方法
            return super.chooseServer(key);
        }
        
        // 略
    }
    

    类BaseLoadBalancer的chooseServer()方法直接调用IRule接口的choose()方法,源码如下:

    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
    

    这里IRule的实现类为ZoneAvoidanceRule,choose()方法的实现在其父类PredicateBasedRule中,如下:

    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
    

    从上面源码可以看出,其先调用ILoadBalancer的getAllServers()方法获取所有Server列表,getAllServers()方法的实现在BaseLoadBalancer类中,此类维护了一个List<Server>类型的属性allServerList,所有Server都缓存至此集合中。获取Server列表后调用chooseRoundRobinAfterFiltering()方法返回Server对象。chooseRoundRobinAfterFiltering()方法会根据loadBalancerKey筛选出候选的Server,然后通过轮询的负载均衡策略选出Server,相关源码如下:

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
    
    private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }
    

    可以看到其轮询选择Server的策略为获取次数加1然后对Server数量取余得到。

    2. Server的状态检测

    BaseLoadBalancer类的集合allServerList缓存了所有Server信息,但是这些Server的状态有可能发生变化,比如Server不可用了,Ribbon就需要及时感知到,那么Ribbon是如何感知Server可用不可用的呢?
    BaseLoadBalancer的构造函数中初始化了一个任务调度器Timer,这个调度器每隔10s执行一次PingTask任务,相关源码如下:

    public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
                IPing ping, IPingStrategy pingStrategy) {
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }
        
    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }
    
    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }
    

    深入Pinger和SerialPingStrategy的源码可知,最终通过NIWSDiscoveryPing这一IPing实现类判断Server是否可用,NIWSDiscoveryPing的isAlive()方法通过判断与Server关联的InstanceInfo的status是否为UP来判断Server是否可用,其isAlive()方法源码如下:

    public boolean isAlive(Server server) {
        boolean isAlive = true;
        if (server!=null && server instanceof DiscoveryEnabledServer){
            DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
            InstanceInfo instanceInfo = dServer.getInstanceInfo();
            if (instanceInfo!=null){                    
                InstanceStatus status = instanceInfo.getStatus();
                if (status!=null){
                    // 其状态是否为UP
                    isAlive = status.equals(InstanceStatus.UP);
                }
            }
        }
        return isAlive;
    }
    

    三、Ribbon的使用姿势

    1. RestTemplate + @LoadBalanced

    • 使用
      提供一个标记@LoadBalanced的RestTemplate Bean,然后直接使用此Bean发起请求即可,如下:

      @Configuration
      public class Config {
      
          @Bean
          @LoadBalanced
          RestTemplate restTemplate() {
              // 提供一个标记@LoadBalanced的RestTemplat Bean
              return new RestTemplate();
          }
      }
      
      @RestController
      public class HelloController {
      
          @Resource
          private RestTemplate restTemplate;
          
          @GetMapping("/hi")
          public String hi() {
              // 直接使用即可
              return restTemplate.getForEntity("http://Eureka-Producer/hello", String.class).getBody();
          }
      }
      
    • 实现原理
      当实例化LoadBalancerAutoConfiguration时,给所有标记了@LoadBalanced的RestTemplate Bean设置了拦截器LoadBalancerInterceptor,此实例保存在了RestTemplate的父类InterceptingHttpAccessor的集合List<ClientHttpRequestInterceptor> interceptors中。RestTemplate相关类图如下:

      image
      设置拦截器LoadBalancerInterceptor源码如下:
      @Configuration
      @ConditionalOnClass(RestTemplate.class)
      @ConditionalOnBean(LoadBalancerClient.class)
      @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
      public class LoadBalancerAutoConfiguration {
      
          // 1. 收集到所有标记了@LoadBalanced的RestTemplate
          @LoadBalanced
          @Autowired(required = false)
          private List<RestTemplate> restTemplates = Collections.emptyList();
      
          @Bean
          public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                  final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
              return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                  for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                      for (RestTemplateCustomizer customizer : customizers) {
                          // 3. 对于每一个RestTemplate执行customize()方法
                          customizer.customize(restTemplate);
                      }
                  }
              });
          }
      
          @Bean
          @ConditionalOnMissingBean
          public LoadBalancerRequestFactory loadBalancerRequestFactory(
                  LoadBalancerClient loadBalancerClient) {
              return new LoadBalancerRequestFactory(loadBalancerClient, transformers);
          }
      
          @Configuration
          @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
          static class LoadBalancerInterceptorConfig {
              @Bean
              public LoadBalancerInterceptor ribbonInterceptor(
                      LoadBalancerClient loadBalancerClient,
                      LoadBalancerRequestFactory requestFactory) {
                  // 2. 注入LoadBalancerInterceptor
                  return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
              }
      
              @Bean
              @ConditionalOnMissingBean
              public RestTemplateCustomizer restTemplateCustomizer(
                      final LoadBalancerInterceptor loadBalancerInterceptor) {
                  return restTemplate -> {
                      // 4. customize()方法给RestTemplate设置LoadBalancerInterceptor
                      List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                              restTemplate.getInterceptors());
                      list.add(loadBalancerInterceptor);
                      restTemplate.setInterceptors(list);
                  };
              }
          }
          // 略
      }
      

      从上面源码可以看出LoadBalancerInterceptor的构造函数接受两个参数:LoadBalancerClient和LoadBalancerRequestFactory,LoadBalancerRequestFactory的实例在此Configuration中被注入类,而LoadBalancerClient的实例却没有。那么LoadBalancerClient的实例是在哪里实例化的呢?答案是RibbonAutoConfiguration,这个Configuration注入了LoadBalancerClient的实现类RibbonLoadBalancerClient的实例和SpringClientFactory的实例,相关源码如下:

      @Bean
      public SpringClientFactory springClientFactory() {
          SpringClientFactory factory = new SpringClientFactory();
          factory.setConfigurations(this.configurations);
          return factory;
      }
      
      @Bean
      @ConditionalOnMissingBean(LoadBalancerClient.class)
      public LoadBalancerClient loadBalancerClient() {
          return new RibbonLoadBalancerClient(springClientFactory());
      }
      

      至此拦截器LoadBalancerInterceptor创建完成并且保存在了RestTemplate的集合属性中,那么RestTemplate是如何利用此拦截器的呢?当我们使用RestTemplate发起请求时最终会调用到RestTemplate的doExecute()方法,此方法会创建ClientHttpRequest对象并调用其execute()方法发起请求,源码如下:

      protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
              @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
      
          ClientHttpResponse response = null;
          try {
              // 1. 创建ClientHttpRequest。
              ClientHttpRequest request = createRequest(url, method);
              if (requestCallback != null) {
                  requestCallback.doWithRequest(request);
              }
              // 2. 执行其execute()方法获取结果。
              response = request.execute();
              handleResponse(url, method, response);
              return (responseExtractor != null ? responseExtractor.extractData(response) : null);
          }
          catch (IOException ex) {
              String resource = url.toString();
              String query = url.getRawQuery();
              resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
              throw new ResourceAccessException("I/O error on " + method.name() +
                      " request for \"" + resource + "\": " + ex.getMessage(), ex);
          }
          finally {
              if (response != null) {
                  response.close();
              }
          }
      }
      
      protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
          ClientHttpRequest request = getRequestFactory().createRequest(url, method);
          if (logger.isDebugEnabled()) {
              logger.debug("HTTP " + method.name() + " " + url);
          }
          return request;
      }
      
      @Override
      public ClientHttpRequestFactory getRequestFactory() {
          List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
          if (!CollectionUtils.isEmpty(interceptors)) {
              ClientHttpRequestFactory factory = this.interceptingRequestFactory;
              if (factory == null) {
                  factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
                  this.interceptingRequestFactory = factory;
              }
              return factory;
          }
          else {
              return super.getRequestFactory();
          }
      }
      

      从上面的getRequestFactory()方法可以看到当集合interceptors不为空的时候ClientHttpRequest对象是由类InterceptingClientHttpRequestFactory的createRequest()方法创建出来的,并且集合interceptors作为参数传递到了InterceptingClientHttpRequestFactory中,深入InterceptingClientHttpRequestFactory的createRequest()方法,如下:

      public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
      
          private final List<ClientHttpRequestInterceptor> interceptors;
      
          public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
                  @Nullable List<ClientHttpRequestInterceptor> interceptors) {
      
              super(requestFactory);
              this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
          }
      
          @Override
          protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
              // 直接返回InterceptingClientHttpRequest对象。
              return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
          }
      
      }
      

      可以看到拦截器最终传递到了InterceptingClientHttpRequest中,上面说了RestTemplate的doExecute()方法创建了InterceptingClientHttpRequest对象且调用了其execute()方法获取响应结果,深入其execute()方法发现在execute()中直接调用了拦截器的intercept()方法,也即InterceptingClientHttpRequest的intercept()方法,源码如下:

      public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
          if (this.iterator.hasNext()) {
              ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
              // 这里调用InterceptingClientHttpRequest的intercept()方法
              return nextInterceptor.intercept(request, body, this);
          }
          // 略
      }
      

      也就是说RestTemplate的请求最终是委托给InterceptingClientHttpRequest来处理。那么InterceptingClientHttpRequest是如何利用Ribbon相关接口处理请求的呢?且看InterceptingClientHttpRequest的intercept()方法:

      public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
      
          private LoadBalancerClient loadBalancer;
          private LoadBalancerRequestFactory requestFactory;
      
          public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
              this.loadBalancer = loadBalancer;
              this.requestFactory = requestFactory;
          }
      
          public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
              // for backwards compatibility
              this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
          }
      
          @Override
          public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                  final ClientHttpRequestExecution execution) throws IOException {
              final URI originalUri = request.getURI();
              String serviceName = originalUri.getHost();
              // 直接调用LoadBalancerClient的execute()方法。
              return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
          }
      }
      

      可以看到InterceptingClientHttpRequest的intercept()方法直接调用LoadBalancerClient的execute()方法,LoadBalancerClient是一个接口,这里其实现类为RibbonLoadBalancerClient,上面创建InterceptingClientHttpRequest时提到LoadBalancerAutoConfiguration注入了RibbonLoadBalancerClient Bean,此Bean通过构造函数保存在了LoadBalancerClient中。那么接下来就是LoadBalancerClient的execute()方法了,类是LoadBalancerClient非常有意思,先看下其类图:

      image

      LoadBalancerClient的execute()方法首先会通过调用SpringClientFactory的getLoadBalancer()方法获取ILoadBalancer,那么此方法是如何返回ILoadBalancer呢?很简单,就是从Spring上下文中获取,那么Spring上下文中的ILoadBalancer时何时注入的呢?答案是RibbonClientConfiguration,此Configuration向Spring上下文注入了以下Bean:

      • ILoadBalancer的实现类ZoneAwareLoadBalancer。
      • IRule的实现类ZoneAvoidanceRule。
      • IClientConfig的实现类DefaultClientConfigImpl。

      另外EurekaRibbonClientConfiguration还注入了:

      • ServerList的实现类DomainExtractingServerList和DiscoveryEnabledNIWSServerList。
      • IPing的实现类NIWSDiscoveryPing。

      源码如下:

      @Bean
      @ConditionalOnMissingBean
      public IClientConfig ribbonClientConfig() {
          DefaultClientConfigImpl config = new DefaultClientConfigImpl();
          config.loadProperties(this.name);
          config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
          config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
          config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
          return config;
      }
      
      @Bean
      @ConditionalOnMissingBean
      public IRule ribbonRule(IClientConfig config) {
          if (this.propertiesFactory.isSet(IRule.class, name)) {
              return this.propertiesFactory.get(IRule.class, config, name);
          }
          ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
          rule.initWithNiwsConfig(config);
          return rule;
      }
      
      @Bean
      @ConditionalOnMissingBean
      public ServerList<Server> ribbonServerList(IClientConfig config) {
          if (this.propertiesFactory.isSet(ServerList.class, name)) {
              return this.propertiesFactory.get(ServerList.class, config, name);
          }
          ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
          serverList.initWithNiwsConfig(config);
          return serverList;
      }
      
      @Bean
      @ConditionalOnMissingBean
      public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
              ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
              IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
          if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
              return this.propertiesFactory.get(ILoadBalancer.class, config, name);
          }
          return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                  serverListFilter, serverListUpdater);
      }
      
      @Bean
      @ConditionalOnMissingBean
      public IPing ribbonPing(IClientConfig config) {
          if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
              return this.propertiesFactory.get(IPing.class, config, serviceId);
          }
          NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
          ping.initWithNiwsConfig(config);
          return ping;
      }
      
      @Bean
      @ConditionalOnMissingBean
      public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
          if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
              return this.propertiesFactory.get(ServerList.class, config, serviceId);
          }
          DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                  config, eurekaClientProvider);
          DomainExtractingServerList serverList = new DomainExtractingServerList(
                  discoveryServerList, config, this.approximateZoneFromHostname);
          return serverList;
      }
      

      ZoneAwareLoadBalancer的构造函数通过调用DiscoveryEnabledNIWSServerList的getUpdatedListOfServers()方法获取Server集合,DiscoveryEnabledNIWSServerList维护了一个Provider<EurekaClient>类型的属性eurekaClientProvider,eurekaClientProvider缓存了EurekaClient的实现类CloudEurekaClient的实例,getUpdatedListOfServers()方法通过调用CloudEurekaClient的getInstancesByVipAddress()方法从Eureka Client缓存中获取应用对应的所有InstanceInfo列表。源码如下:

      // 缓存了EurekaClient的实现类CloudEurekaClient的实例
      private final Provider<EurekaClient> eurekaClientProvider;
      
      @Override
      public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
          return obtainServersViaDiscovery();
      }
      
      private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
          List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
      
          if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
              logger.warn("EurekaClient has not been initialized yet, returning an empty list");
              return new ArrayList<DiscoveryEnabledServer>();
          }
      
          EurekaClient eurekaClient = eurekaClientProvider.get();
          if (vipAddresses!=null){
              for (String vipAddress : vipAddresses.split(",")) {
                  // if targetRegion is null, it will be interpreted as the same region of client
                  List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                  for (InstanceInfo ii : listOfInstanceInfo) {
                      if (ii.getStatus().equals(InstanceStatus.UP)) {
      
                          if(shouldUseOverridePort){
                              if(logger.isDebugEnabled()){
                                  logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                              }
      
                              InstanceInfo copy = new InstanceInfo(ii);
      
                              if(isSecure){
                                  ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                              }else{
                                  ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                              }
                          }
      
                          DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
                          serverList.add(des);
                      }
                  }
                  if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                      break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                  }
              }
          }
          return serverList;
      }
      

      LoadBalancerClient的execute()方法在通过调用SpringClientFactory的getLoadBalancer()方法获取ILoadBalancer后调用其chooseServer()返回一个Server对象,如下:

      public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
          // 1. 获取ILoadBalancer
          ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
          
          // 2. 通过ILoadBalancer选择一个Server
          Server server = getServer(loadBalancer, hint);
          if (server == null) {
              throw new IllegalStateException("No instances available for " + serviceId);
          }
          RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                  serviceId), serverIntrospector(serviceId).getMetadata(server));
      
          // 3. 对Server发起请求
          return execute(serviceId, ribbonServer, request);
      }
      
      protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
          if (loadBalancer == null) {
              return null;
          }
          // Use 'default' on a null hint, or just pass it on?
          return loadBalancer.chooseServer(hint != null ? hint : "default");
      }
      

      ZoneAwareLoadBalancer的chooseServer()方法会通过调用ZoneAvoidanceRule的choose()方法返回一个Server,ZoneAvoidanceRule继承类ClientConfigEnabledRoundRobinRule,所以其会根据ZoneAwareLoadBalancer获取的Server列表采用轮询的负载均衡策略选择一个Server返回;最后根据此Server的地址等向其发起请求。
      相关类图如下:

      image

    2. Feign接口

    相对于RestTemplate+@Loadbalance的方式,我们在使用Spring Cloud的时候使用更多的是Feign接口,因为Feign接口使用起来会更加简单,下面就是一个使用Feign接口调用服务的例子:

    // 定义Feign接口
    @FeignClient(value = "Eureka-Producer", fallbackFactory = HelloClientFallbackFactory.class)
    public interface HelloClient {
    
        @GetMapping("/hello")
        String hello();
    }
    
    // 订单熔断快速失败回调
    @Component
    public class HelloClientFallbackFactory implements FallbackFactory<HelloClient>, HelloClient {
    
        @Override
        public HelloClient create(Throwable throwable) {
            return this;
        }
    
        @Override
        public String hello() {
            return "熔断";
        }
    }
    
    // 使用
    @RestController
    public class HelloController {
    
        @Resource
        private HelloClient helloClient;
    
        @GetMapping("/hello")
        public String hello() {
            return helloClient.hello();
        }
    }
    

    与RestTemplate的通过RibbonLoadBalancerClient获取Server并执行请求类似,Feign接口通过LoadBalancerFeignClient获取Server并执行请求。DefaultFeignLoadBalancedConfiguration会注入LoadBalancerFeignClient Bean,源码如下:

    @Configuration
    class DefaultFeignLoadBalancedConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
                                  SpringClientFactory clientFactory) {
            return new LoadBalancerFeignClient(new Client.Default(null, null),
                    cachingFactory, clientFactory);
        }
    }
    

    那么Feign接口是如何通过LoadBalancerFeignClient实现负载均衡调用的呢?在《Feign源码解析》一文中介绍到Feign接口的代理实现类由FeignClientFactoryBean负责生成,FeignClientFactoryBean实现了FactoryBean,所以其getObject()方法会返回Feign接口的代理实现,getObject()方法会从Spring上下文中获取到LoadBalancerFeignClient,源码如下:

    @Override
    public Object getObject() throws Exception {
        return getTarget();
    }
    
    <T> T getTarget() {
        FeignContext context = applicationContext.getBean(FeignContext.class);
        Feign.Builder builder = feign(context);
    
        if (!StringUtils.hasText(this.url)) {
            if (!this.name.startsWith("http")) {
                url = "http://" + this.name;
            }
            else {
                url = this.name;
            }
            url += cleanPath();
            return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,
                    this.name, url));
        }
        if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
            this.url = "http://" + this.url;
        }
        String url = this.url + cleanPath();
        // 从Spring上下文中获取LoadBalancerFeignClient
        Client client = getOptional(context, Client.class);
        if (client != null) {
            if (client instanceof LoadBalancerFeignClient) {
                // not load balancing because we have a url,
                // but ribbon is on the classpath, so unwrap
                client = ((LoadBalancerFeignClient)client).getDelegate();
            }
            builder.client(client);
        }
        Targeter targeter = get(context, Targeter.class);
        return (T) targeter.target(this, builder, context, new HardCodedTarget<>(
                this.type, this.name, url));
    }
    

    LoadBalancerFeignClient对外提供服务的接口是execute()方法,那么此方法是何时被Feign接口调用的呢?从《Feign源码解析》一文中可知SynchronousMethodHandler作为MethodHandler的实现在调用Feign接口时进行拦截并执行其invoke()方法,invoke()方法则调用了LoadBalancerFeignClient的execute()方法发起网络请求,相关源码如下:

    @Override
    public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        Retryer retryer = this.retryer.clone();
        while (true) {
          try {
            return executeAndDecode(template);
          } catch (RetryableException e) {
            // 略
            continue;
          }
        }
    }
    
    Object executeAndDecode(RequestTemplate template) throws Throwable {
        Request request = targetRequest(template);
        
        if (logLevel != Logger.Level.NONE) {
          logger.logRequest(metadata.configKey(), logLevel, request);
        }
        
        Response response;
        long start = System.nanoTime();
        try {
          // 调用LoadBalancerFeignClient的execute()方法获取响应。
          response = client.execute(request, options);
        } catch (IOException e) {
          if (logLevel != Logger.Level.NONE) {
            logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
          }
          throw errorExecuting(request, e);
        }
        
        // 略
    }
    

    那么LoadBalancerFeignClient的execute()方法又是如何利用Ribbon做负载均衡的呢?其通过调用CachingSpringLoadBalancerFactory的create()方法获取FeignLoadBalancer对象,FeignLoadBalancer对象持有一个ILoadBalancer的对象实例,此ILoadBalancer对象实例是CachingSpringLoadBalancerFactory通过调用SpringClientFactory的getLoadBalancer()方法从Spring上下文中获取的,源码如下:

    public FeignLoadBalancer create(String clientName) {
        FeignLoadBalancer client = this.cache.get(clientName);
        if(client != null) {
            return client;
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        client = loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }
    

    创建完FeignLoadBalancer后紧接着接着调用了FeignLoadBalancer的executeWithLoadBalancer()方法,如下:

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        URI asUri = URI.create(request.url());
        String clientName = asUri.getHost();
        URI uriWithoutHost = cleanUrl(request.url(), clientName);
        FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                this.delegate, request, uriWithoutHost);
    
        IClientConfig requestConfig = getClientConfig(options, clientName);
        // 执行FeignLoadBalancer的executeWithLoadBalancer()方法。
        return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                requestConfig).toResponse();
        // 略
    }
    // 创建FeignLoadBalancer对象并返回
    private FeignLoadBalancer lbClient(String clientName) {
        return this.lbClientFactory.create(clientName);
    }
    

    executeWithLoadBalancer()方法的具体实现在类FeignLoadBalancer的父类AbstractLoadBalancerAwareClient中,如下:

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    
        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            // 略
        }
    }
    

    executeWithLoadBalancer()方法创建了LoadBalancerCommand对象并且向提交(submit()方法)了一个ServerOperation对象,跟踪LoadBalancerCommand的submit()方法发现其调用了selectServer()方法获取Server,而selectServer()方法则委托给了FeignLoadBalancer的父类LoadBalancerContext的getServerFromLoadBalancer()方法获取Server,如下:

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    
        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
    
    public Observable<T> submit(final ServerOperation<T> operation) {
            final ExecutionInfoContext context = new ExecutionInfoContext();
        // 略
        
        // 这里当server为null时调用selectServer()获取Server。
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        
                        // Called for each attempt and retry
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        // 略
                    }
                });
            // 略
    }
    
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    // 调用LoadBalancerContext的getServerFromLoadBalancer()获取Server
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
    

    FeignLoadBalancer和LoadBalancerCommand互相依赖、彼此调用,最终FeignLoadBalancer的父类LoadBalancerContext的getServerFromLoadBalancer()方法返回了Server,此方法通过调用其持有的ILoadBalancer对象的chooseServer()方法获取Server,源码如下:

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
            String host = null;
        int port = -1;
        if (original != null) {
            host = original.getHost();
        }
        if (original != null) {
            Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
            port = schemeAndPort.second();
        }
        // 获取ILoadBalancer
        ILoadBalancer lb = getLoadBalancer();
        // 调用ILoadBalancer的chooseServer()方法获取Server。
        Server svc = lb.chooseServer(loadBalancerKey);
        if (svc == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,
                    "Load balancer does not have available server for client: "
                            + clientName);
        }
        host = svc.getHost();
        if (host == null){
            throw new ClientException(ClientException.ErrorType.GENERAL,
                    "Invalid Server for :" + svc);
        }
        logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
        return svc;
    }
    

    至此终于看到了通过ILoadBalancer获取Server的代码了,相关类图如下:

    image

    四、总结

    Ribbon通过ILoadBalancer接口提供负载均衡服务,其实现原理为:

    • ILoadBalancer依赖ServerList通过DiscoveryClient从Eureka Client处获取Server列表并缓存这些Server列表。
    • IPing接口定时对ILoadBalancer缓存的Server列表进行检测,判断其是否可用。
    • IRule接口是负载均衡策略的抽象,ILoadBalancer通过IRule选出一个Server。

    当使用RestTemplate+@LoadBalanced的方式进行服务调用时,LoadBalancerInterceptor和RibbonLoadBalancerClient作为桥梁结合Ribbon提供负载均衡服务。

    当使用Feign接口调用服务时,LoadBalancerFeignClient和FeignLoadBalancer作为调用Ribbon的入口为Feign接口提供负载均衡服务。

    不管使用那种姿势,最终都会通过Ribbon的ILoadBalancer接口实现负载均衡。

    最后放两个彩蛋

    1. Ribbon相关Configuration以及注入的Bean:

      • RibbonAutoConfiguration

        • 注入了 LoadBalancerClient的实现类RibbonLoadBalancerClient。
        • 注入了SpringClientFactory。
      • LoadBalancerAutoConfiguration

        • 注入了LoadBalancerInterceptor。
        • 给RestTemplate设置LoadBalancerInterceptor。
      • RibbonClientConfiguration

        • 注入了ILoadBalancer的实现类ZoneAwareLoadBalancer。
        • 注入了IRule的实现类ZoneAvoidanceRule。
        • 注入了IClientConfig的实现类DefaultClientConfigImpl。
      • EurekaRibbonClientConfiguration

        • 注入了IPing的实现类NIWSDiscoveryPing。
        • 注入了ServerList的实现类DiscoveryEnabledNIWSServerList。
    2. Ribbon相关类关系图:

    image

    相关文章

      网友评论

          本文标题:Ribbon原理解析

          本文链接:https://www.haomeiwen.com/subject/kgfpgqtx.html