Spring Cloud源码分析(二)Ribbon

作者: 程序猿DD | 来源:发表于2016-10-20 08:52 被阅读2803次

    断断续续看Ribbon的源码差不多也有7-8天了,总算告一段落。本文记录了这些天对源码的阅读过程与一些分析理解,如有不对还请指出。

    友情提示:本文较长,请选择一个较为舒适的姿势来阅读

    在之前介绍使用Ribbon进行服务消费的时候,我们用到了RestTemplate,但是熟悉Spring的同学们是否产生过这样的疑问:RestTemplate不是Spring自己就有的吗?跟Ribbon的客户端负载均衡又有什么关系呢?下面在本文,我们来看RestTemplateRibbon是如何联系起来并实现客户端负载均衡的。

    首先,回顾一下之前的消费者示例:我们是如何实现客户端负载均衡的?仔细观察一下代码之前的代码,我们可以发现在消费者的例子中,可能就是这个注解@LoadBalanced是之前没有接触过的,并且从命名上来看也与负载均衡相关。我们不妨以此为线索来看看源码实现的机制。

    @LoadBalanced注解源码的注释中,我们可以知道该注解用来给RestTemplate标记,以使用负载均衡的客户端(LoadBalancerClient)来配置它。

    通过搜索LoadBalancerClient,我们可以发现这是Spring Cloud中定义的一个接口:

    public interface LoadBalancerClient {
    
        ServiceInstance choose(String serviceId);
    
        <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    
        URI reconstructURI(ServiceInstance instance, URI original);
    
    }
    

    从该接口中,我们可以通过定义的抽象方法来了解到客户端负载均衡器中应具备的几种能力:

    • ServiceInstance choose(String serviceId):根据传入的服务名serviceId,从负载均衡器中挑选一个对应服务的实例。
    • T execute(String serviceId, LoadBalancerRequest request) throws IOException:使用从负载均衡器中挑选出的服务实例来执行请求内容。
    • URI reconstructURI(ServiceInstance instance, URI original):为系统构建一个合适的“host:port”形式的URI。在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的“host:port”形式)进行请求,比如:http://myservice/path/to/service。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host的URI,而返回的URI内容则是通过ServiceInstance的服务实例详情拼接出的具体“host:post”形式的请求地址。

    顺着LoadBalancerClient接口的所属包org.springframework.cloud.client.loadbalancer,我们对其内容进行整理,可以得出如下图的关系:

    从类的命名上我们初步判断LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置类。通过查看源码,我们可以验证这一点假设:

    @Configuration
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    public class LoadBalancerAutoConfiguration {
    
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
    
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
                final List<RestTemplateCustomizer> customizers) {
            return new SmartInitializingSingleton() {
                @Override
                public void afterSingletonsInstantiated() {
                    for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                        for (RestTemplateCustomizer customizer : customizers) {
                            customizer.customize(restTemplate);
                        }
                    }
                }
            };
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(
                final LoadBalancerInterceptor loadBalancerInterceptor) {
            return new RestTemplateCustomizer() {
                @Override
                public void customize(RestTemplate restTemplate) {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                }
            };
        }
    
        @Bean
        public LoadBalancerInterceptor ribbonInterceptor(
                LoadBalancerClient loadBalancerClient) {
            return new LoadBalancerInterceptor(loadBalancerClient);
        }
    
    }
    

    LoadBalancerAutoConfiguration类头上的注解可以知道Ribbon实现的负载均衡自动化配置需要满足下面两个条件:

    • @ConditionalOnClass(RestTemplate.class)RestTemplate类必须存在于当前工程的环境中。
    • @ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中有必须有LoadBalancerClient的实现Bean。

    在该自动化配置类中,主要做了下面三件事:

    • 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
    • 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。
    • 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。

    接下来,我们看看LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            this.loadBalancer = loadBalancer;
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            return this.loadBalancer.execute(serviceName,
                    new LoadBalancerRequest<ClientHttpResponse>() {
                        @Override
                        public ClientHttpResponse apply(final ServiceInstance instance)
                                throws Exception {
                            HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                                    instance);
                            return execution.execute(serviceRequest, body);
                        }
                    });
        }
    
        private class ServiceRequestWrapper extends HttpRequestWrapper {
    
            private final ServiceInstance instance;
    
            public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance) {
                super(request);
                this.instance = instance;
            }
    
            @Override
            public URI getURI() {
                URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
                        this.instance, getRequest().getURI());
                return uri;
            }
        }
    }
    

    通过源码以及之前的自动化配置类,我们可以看到在拦截器中注入了LoadBalancerClient的实现。当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。由于我们在使用RestTemplate时候采用了服务名作为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute函数去根据服务名来选择实例并发起实际的请求。

    分析到这里,LoadBalancerClient还只是一个抽象的负载均衡器接口,所以我们还需要找到它的具体实现类来进一步分析。通过查看ribbon的源码,我们可以很容易的在org.springframework.cloud.netflix.ribbon包下找到对应的实现类:RibbonLoadBalancerClient

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                serviceId), serverIntrospector(serviceId).getMetadata(server));
    
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
        try {
            T returnVal = request.apply(ribbonServer);
            statsRecorder.recordStats(returnVal);
            return returnVal;
        }
        catch (IOException ex) {
            statsRecorder.recordStats(ex);
            throw ex;
        }
        catch (Exception ex) {
            statsRecorder.recordStats(ex);
            ReflectionUtils.rethrowRuntimeException(ex);
        }
        return null;
    }
    

    可以看到,在execute函数的实现中,第一步做的就是通过getServer根据传入的服务名serviceId去获得具体的服务实例:

    protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        return loadBalancer.chooseServer("default");
    }
    

    通过getServer函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用LoadBalancerClient接口中的choose函数,而是使用了ribbon自身的ILoadBalancer接口中定义的chooseServer函数。

    我们先来认识一下ILoadBalancer接口:

    public interface ILoadBalancer {
    
        public void addServers(List<Server> newServers);
    
        public Server chooseServer(Object key);
    
        public void markServerDown(Server server);
    
        public List<Server> getReachableServers();
    
        public List<Server> getAllServers();
    }
    

    可以看到,在该接口中定义了一个软负载均衡器需要的一系列抽象操作(未例举过期函数):

    • addServers:向负载均衡器中维护的实例列表增加服务实例。
    • chooseServer:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。
    • markServerDown:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
    • getReachableServers:获取当前正常服务的实例列表。
    • getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。

    在该接口定义中涉及到的Server对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。

    而对于该接口的实现,我们可以整理出如上图所示的结构。我们可以看到BaseLoadBalancer类实现了基础的负载均衡,而DynamicServerListLoadBalancerZoneAwareLoadBalancer在负载均衡的策略上做了一些功能的扩展。

    那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration配置类,可以知道在整合时默认采用了ZoneAwareLoadBalancer来实现负载均衡器。

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping) {
        ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
                .withClientConfig(config).withRule(rule).withPing(ping)
                .withServerListFilter(serverListFilter).withDynamicServerList(serverList)
                .buildDynamicServerListLoadBalancer();
        return balancer;
    }
    

    下面,我们再回到RibbonLoadBalancerClientexecute函数逻辑,在通过ZoneAwareLoadBalancerchooseServer函数获取了负载均衡策略分配到的服务实例对象Server之后,将其内容包装成RibbonServer对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequestapply(final ServiceInstance instance)函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求,到实际访问host:post形式的具体地址的转换。

    apply(final ServiceInstance instance)函数中传入的ServiceInstance接口是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如:serviceId、host、port等,具体定义如下:

    public interface ServiceInstance {
    
        String getServiceId();
    
        String getHost();
    
        int getPort();
    
        boolean isSecure();
    
        URI getUri();
    
        Map<String, String> getMetadata();
    }
    

    而上面提到的具体包装Server服务实例的RibbonServer对象就是ServiceInstance接口的实现,可以看到它除了包含了Server对象之外,还存储了服务名、是否使用https标识以及一个Map类型的元数据集合。

    protected static class RibbonServer implements ServiceInstance {
    
        private final String serviceId;
        private final Server server;
        private final boolean secure;
        private Map<String, String> metadata;
    
        protected RibbonServer(String serviceId, Server server) {
            this(serviceId, server, false, Collections.<String, String> emptyMap());
        }
    
        protected RibbonServer(String serviceId, Server server, boolean secure,
                Map<String, String> metadata) {
            this.serviceId = serviceId;
            this.server = server;
            this.secure = secure;
            this.metadata = metadata;
        }
    
        // 省略实现ServiceInstance的一些获取Server信息的get函数
        ...
    }
    

    那么apply(final ServiceInstance instance)函数,在接收到了具体ServiceInstance实例后,是如何通过LoadBalancerClient接口中的reconstructURI操作来组织具体请求地址的呢?

    @Override
    public ClientHttpResponse apply(final ServiceInstance instance)
                throws Exception {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance);
        return execution.execute(serviceRequest, body);
    }
    

    apply的实现中,我们可以看到它具体执行的时候,还传入了ServiceRequestWrapper对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写后的getURI会通过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI来进行访问。

    private class ServiceRequestWrapper extends HttpRequestWrapper {
    
        private final ServiceInstance instance;
    
        ...
    
        @Override
        public URI getURI() {
            URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
                    this.instance, getRequest().getURI());
            return uri;
        }
    }
    

    LoadBalancerInterceptor拦截器中,ClientHttpRequestExecution的实例具体执行execution.execute(serviceRequest, body)时,会调用InterceptingClientHttpRequestInterceptingRequestExecution类的execute函数,具体实现如下:

    public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
        if (this.iterator.hasNext()) {
            ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
            return nextInterceptor.intercept(request, body, this);
        }
        else {
            ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
            delegate.getHeaders().putAll(request.getHeaders());
            if (body.length > 0) {
                StreamUtils.copy(body, delegate.getBody());
            }
            return delegate.execute();
        }
    }
    

    可以看到在创建请求的时候requestFactory.createRequest(request.getURI(), request.getMethod());,这里request.getURI()会调用之前介绍的ServiceRequestWrapper对象中重写的getURI函数。此时,它就会使用RibbonLoadBalancerClient中实现的reconstructURI来组织具体请求的服务实例地址。

    public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId();
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId);
        Server server = new Server(instance.getHost(), instance.getPort());
        boolean secure = isSecure(server, serviceId);
        URI uri = original;
        if (secure) {
            uri = UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();
        }
        return context.reconstructURIWithServer(server, uri);
    }
    

    reconstructURI函数中,我们可以看到,它通过ServiceInstance实例对象的serviceId,从SpringClientFactory类的clientFactory对象中获取对应serviceId的负载均衡器的上下文RibbonLoadBalancerContext对象。然后根据ServiceInstance中的信息来构建具体服务实例信息的Server对象,并使用RibbonLoadBalancerContext对象的reconstructURIWithServer函数来构建服务实例的URI。

    为了帮助理解,简单介绍一下上面提到的SpringClientFactoryRibbonLoadBalancerContext

    • SpringClientFactory类是一个用来创建客户端负载均衡器的工厂类,该工厂会为每一个不同名的ribbon客户端生成不同的Spring上下文。
    • RibbonLoadBalancerContext类是LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和Api操作(reconstructURIWithServer就是其中之一)。

    reconstructURIWithServer的实现中我们可以看到,它同reconstructURI的定义类似。只是reconstructURI的第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定义的Server,所以在RibbonLoadBalancerClient实现reconstructURI时候,做了一次转换,使用ServiceInstance的host和port信息来构建了一个Server对象来给reconstructURIWithServer使用。从reconstructURIWithServer的实现逻辑中,我们可以看到,它从Server对象中获取host和port信息,然后根据以服务名为host的URI对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。

    public class LoadBalancerContext implements IClientConfigAware {
    
        ...
    
        public URI reconstructURIWithServer(Server server, URI original) {
            String host = server.getHost();
            int port = server .getPort();
            if (host.equals(original.getHost())
                    && port == original.getPort()) {
                return original;
            }
            String scheme = original.getScheme();
            if (scheme == null) {
                scheme = deriveSchemeAndPortFromPartialUri(original).first();
            }
    
            try {
                StringBuilder sb = new StringBuilder();
                sb.append(scheme).append("://");
                if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                    sb.append(original.getRawUserInfo()).append("@");
                }
                sb.append(host);
                if (port >= 0) {
                    sb.append(":").append(port);
                }
                sb.append(original.getRawPath());
                if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                    sb.append("?").append(original.getRawQuery());
                }
                if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                    sb.append("#").append(original.getRawFragment());
                }
                URI newURI = new URI(sb.toString());
                return newURI;
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
    
        ...
    }
    

    另外,从RibbonLoadBalancerClientexecute的函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,ribbon还通过RibbonStatsRecorder对象对服务的请求还进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。

    分析到这里,我们已经可以大致理清Spring Cloud中使用Ribbon实现客户端负载均衡的基本脉络。了解了它是如何通过LoadBalancerInterceptor拦截器对RestTemplate的请求进行拦截,并利用Spring Cloud的负载均衡器LoadBalancerClient将以逻辑服务名为host的URI转换成具体的服务实例的过程。同时通过分析LoadBalancerClient的Ribbon实现RibbonLoadBalancerClient,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer接口的实现,自动化配置会采用ZoneAwareLoadBalancer的实例来进行客户端负载均衡实现。

    负载均衡器

    通过之前的分析,我们已经对Spring Cloud如何使用Ribbon有了基本的了解。虽然Spring Cloud中定义了LoadBalancerClient为负载均衡器的接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是它在具体实现客户端负载均衡时,则是通过Ribbon的ILoadBalancer接口实现。在上一节分析时候,我们对该接口的实现结构已经做了一些简单的介绍,下面我们根据ILoadBalancer接口的实现类逐个看看它都是如何实现客户端负载均衡的。

    AbstractLoadBalancer

    AbstractLoadBalancerILoadBalancer接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类ServerGroup,它包含了三种不同类型:ALL-所有服务实例、STATUS_UP-正常服务的实例、STATUS_NOT_UP-停止服务的实例;实现了一个chooseServer()函数,该函数通过调用接口中的chooseServer(Object key)实现,其中参数key为null,表示在选择具体服务实例时忽略key的条件判断;最后还定义了两个抽象函数,getServerList(ServerGroup serverGroup)定义了根据分组类型来获取不同的服务实例列表,getLoadBalancerStats()定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。

    public abstract class AbstractLoadBalancer implements ILoadBalancer {
    
        public enum ServerGroup{
            ALL,
            STATUS_UP,
            STATUS_NOT_UP
        }
    
        public Server chooseServer() {
            return chooseServer(null);
        }
    
        public abstract List<Server> getServerList(ServerGroup serverGroup);
    
        public abstract LoadBalancerStats getLoadBalancerStats();
    }
    

    BaseLoadBalancer

    BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义很多关于均衡负载器相关的基础内容:

    • 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> upServerList = Collections
            .synchronizedList(new ArrayList<Server>());
    
    • 定义了之前我们提到的用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。
    • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。
    • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当我们实现的IPing速度不理想,或是Server列表过大时,可能变的不是很为理想,这时候我们需要通过实现IPingStrategy接口并实现pingServers(IPing ping, Server[] servers)函数去扩展ping的执行策略。
    private static class SerialPingStrategy implements IPingStrategy {
        @Override
        public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];
    
            if (logger.isDebugEnabled()) {
                logger.debug("LoadBalancer:  PingTask executing ["
                             + numCandidates + "] servers configured");
            }
    
            for (int i = 0; i < numCandidates; i++) {
                results[i] = false;
                try {
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Throwable t) {
                    logger.error("Exception while pinging Server:"
                                 + servers[i], t);
                }
            }
            return results;
        }
    }
    
    • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancerchooseServer(Object key)的实现源码,我们可以知道负载均衡器实际进行服务实例选择任务是委托给了IRule实例中的choose函数来实现。而在这里,默认初始化了RoundRobinRuleIRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Throwable t) {
                return null;
            }
        }
    }
    
    • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔为:10秒。
    • 实现了ILoadBalancer接口定义的负载均衡器应具备的一系列基本操作:
      • addServers(List newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是对setServersList函数的重写来实现的。
    public void addServers(List<Server> newServers) {
        if (newServers != null && newServers.size() > 0) {
            try {
                ArrayList<Server> newList = new ArrayList<Server>();
                newList.addAll(allServerList);
                newList.addAll(newServers);
                setServersList(newList);
            } catch (Exception e) {
                logger.error("Exception while adding Servers", e);
            }
        }
    }
    
    • chooseServer(Object key):挑选一个具体的服务实例,在上面介绍IRule的时候,已经做了说明,这里不再赘述。
    • markServerDown(Server server):标记某个服务实例暂停服务。
    public void markServerDown(Server server) {
        if (server == null) {
            return;
        }
        if (!server.isAlive()) {
            return;
        }
        logger.error("LoadBalancer:  markServerDown called on ["
                + server.getId() + "]");
        server.setAlive(false);
        notifyServerStatusChangeListener(singleton(server));
    }
    
    • getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。
    public List<Server> getReachableServers() {
        return Collections.unmodifiableList(upServerList);
    }
    
    • getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以也直接返回它即可。
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }
    

    DynamicServerListLoadBalancer

    DynamicServerListLoadBalancer类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单的在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说我们可以通过过滤器来选择性的获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容:

    ServerList

    DynamicServerListLoadBalancer的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象:ServerList<T> serverListImpl。其中泛型T从类名中对于T的限定DynamicServerListLoadBalancer<T extends Server>可以获知它是一个Server的子类,即代表了一个具体的服务实例的扩展类。而ServerList接口定义如下所示:

    public interface ServerList<T extends Server> {
    
        public List<T> getInitialListOfServers();
    
        public List<T> getUpdatedListOfServers();
    }
    

    它定义了两个抽象方法:getInitialListOfServers用于获取初始化的服务实例清单,而getUpdatedListOfServers用于获取更新的服务实例清单。那么该接口的实现有哪些呢?通过搜索源码,我们可以整出如下图的结构:

    从图中我们可以看到有很多个ServerList的实现类,那么在DynamicServerListLoadBalancer中的ServerList默认配置到底使用了哪个具体实现呢?既然在该负载均衡器中需要实现服务实例的动态更新,那么势必需要ribbon具备访问eureka来获取服务实例的能力,所以我们从Spring Cloud整合ribbon与eureka的包org.springframework.cloud.netflix.ribbon.eureka下探索,可以找到配置类EurekaRibbonClientConfiguration,在该类中可以找到看到下面创建ServerList实例的内容:

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config) {
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }
    

    可以看到,这里创建的是一个DomainExtractingServerList实例,从下面它的源码中我们可以看到在它内部还定义了一个ServerList list。同时,DomainExtractingServerList类中对getInitialListOfServersgetUpdatedListOfServers的具体实现,其实委托给了内部定义的ServerList list对象,而该对象是通过创建DomainExtractingServerList时候,由构造函数传入的DiscoveryEnabledNIWSServerList实现的。

    public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
    
        private ServerList<DiscoveryEnabledServer> list;
        private IClientConfig clientConfig;
        private boolean approximateZoneFromHostname;
    
        public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
                IClientConfig clientConfig, boolean approximateZoneFromHostname) {
            this.list = list;
            this.clientConfig = clientConfig;
            this.approximateZoneFromHostname = approximateZoneFromHostname;
        }
    
        @Override
        public List<DiscoveryEnabledServer> getInitialListOfServers() {
            List<DiscoveryEnabledServer> servers = setZones(this.list
                    .getInitialListOfServers());
            return servers;
        }
    
        @Override
        public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
            List<DiscoveryEnabledServer> servers = setZones(this.list
                    .getUpdatedListOfServers());
            return servers;
        }
        ...
    }
    

    那么DiscoveryEnabledNIWSServerList是如何实现这两个服务实例的获取的呢?我们从源码中可以看到这两个方法都是通过该类中的一个私有函数obtainServersViaDiscovery来通过服务发现机制来实现服务实例的获取。

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }
    
    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }
    

    obtainServersViaDiscovery的实现逻辑如下,主要依靠EurekaClient从服务注册中心中获取到具体的服务实例InstanceInfo列表(EurekaClient的具体实现,我们在分析Eureka的源码时已经做了详细的介绍,这里传入的vipAddress可以理解为逻辑上的服务名,比如“USER-SERVICE”)。接着,对这些服务实例进行遍历,将状态为“UP”(正常服务)的实例转换成DiscoveryEnabledServer对象,最后将这些实例组织成列表返回。

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    
        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }
    
        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(
                            vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {
                        // 省略了一些实例信息的加工逻辑
                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break;
                }
            }
        }
        return serverList;
    }
    

    DiscoveryEnabledNIWSServerList中通过EurekaClient从服务注册中心获取到最新的服务实例清单后,返回的List到了DomainExtractingServerList类中,将继续通过setZones函数进行处理,而这里的处理具体内容如下,主要完成将DiscoveryEnabledNIWSServerList返回的List列表中的元素,转换成内部定义的DiscoveryEnabledServer的子类对象DomainExtractingServer,在该对象的构造函数中将为服务实例对象设置一些必要的属性,比如id、zone、isAliveFlag、readyToServe等信息。

    private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
        List<DiscoveryEnabledServer> result = new ArrayList<>();
        boolean isSecure = this.clientConfig.getPropertyAsBoolean(
                CommonClientConfigKey.IsSecure, Boolean.TRUE);
        boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(
                CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }
    
    ServerListUpdater

    通过上面的分析我们已经知道了Ribbon与Eureka整合后,如何实现从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看DynamicServerListLoadBalancer中的实现内容,我们可以很容易的找到下面定义的关于ServerListUpdater的内容:

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    
    protected volatile ServerListUpdater serverListUpdater;
    

    根据该接口的命名,我们基本就能猜到,这个对象实现的是对ServerList的更新,所以可以称它为“服务更新器”,从下面的源码中可以看到,在ServerListUpdater内部还定义了一个UpdateAction接口,上面定义的updateAction对象就是以匿名内部类的方式创建了一个它的具体实现,其中doUpdate实现的内容就是对ServerList的具体更新操作。除此之外,“服务更新器”中还定义了一系列控制它和获取它一些信息的操作。

    public interface ServerListUpdater {
    
        public interface UpdateAction {
            void doUpdate();
        }
    
        // 启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。
        void start(UpdateAction updateAction);
    
        // 停止服务更新器
        void stop();
    
        // 获取最近的更新时间戳
        String getLastUpdate();
    
        // 获取上一次更新到现在的时间间隔,单位为毫秒
        long getDurationSinceLastUpdateMs();
    
        // 获取错过的更新周期数
        int getNumberMissedCycles();
    
        // 获取核心线程数
        int getCoreThreads();
    }
    

    ServerListUpdater的实现类不多,具体下图所示。

    根据两个类的注释,我们可以很容易的知道它们的作用分别是:

    • PollingServerListUpdater:动态服务列表更新的默认策略,也就是说DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。
    • EurekaNotificationServerListUpdater:该更新器也可服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。

    下面我们来详细看看它默认实现的PollingServerListUpdater。先从用于启动“服务更新器”的start函数源码看起,具体如下。我们可以看到start函数的实现内容验证了之前提到的:以定时任务的方式进行服务列表的更新。它先创建了一个Runnable的线程实现,在该实现中调用了上面提到的具体更新服务实例列表的方法updateAction.doUpdate(),最后再为这个Runnable的线程实现启动了一个定时任务来执行。

    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
    
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }
    

    继续看PollingServerListUpdater的其他内容,我们可以找到用于启动定时任务的2个重要参数initialDelayMsrefreshIntervalMs的默认定义分别为1000和30*1000,单位为毫秒。也就是说更新服务实例在初始化之后延迟1秒后开始执行,并以30秒为周期重复执行。除了这些内容之外,我们还能看到它还会记录最后更新时间、是否存活等信息,同时也实现了ServerListUpdater中定义的一些其他操作内容,这些操作相对比较简单,这里不再具体说明,有兴趣的读者可以自己查看源码了解其实现原理。

    ServerListFilter

    在了解了更新服务实例的定时任务是如何启动的之后,我们继续回到updateAction.doUpdate()调用的具体实现位置,在DynamicServerListLoadBalancer中,它的实际实现委托给了updateListOfServers函数,具体实现如下:

    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);
    
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
    

    可以看到,这里终于用到了我们之前提到的ServerListgetUpdatedListOfServers,通过之前的介绍我们已经可以知道这一步实现了从Eureka Server中获取服务可用实例的列表。在获得了服务实例列表之后,这里又将引入一个新的对象filter,追朔该对象的定义,我们可以找到它是ServerListFilter定义的。

    ServerListFilter接口非常简单,该接口中之定义了一个方法List getFilteredListOfServers(List servers),主要用于实现对服务实例列表的过滤,通过传入的服务实例清单,根据一些规则返回过滤后的服务实例清单。该接口的实现如下图所示:

    其中,除了ZonePreferenceServerListFilter的实现是Spring Cloud Netflix中对Ribbon的扩展实现外,其他均是Netflix Ribbon中的实现类。我们可以分别看看这些过滤器实现都有什么特点:

    • AbstractServerListFilter:这是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象LoadBalancerStats,我们在之前介绍过的,该对象存储了关于负载均衡器的一些属性和统计信息等。
    public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> {
    
        private volatile LoadBalancerStats stats;
    
        public void setLoadBalancerStats(LoadBalancerStats stats) {
            this.stats = stats;
        }
    
        public LoadBalancerStats getLoadBalancerStats() {
            return stats;
        }
    }
    
    • ZoneAffinityServerListFilter:该过滤器基于“区域感知(Zone Affinity)”的方式实现服务实例的过滤,也就是说它会根据提供服务的实例所处区域(Zone)与消费者自身的所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例。
    public List<T> getFilteredListOfServers(List<T> servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List<T> filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }
    

    从上面的源码中我们可以看到,对于服务实例列表的过滤是通过Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,其中判断依据由ZoneAffinityPredicate实现服务实例与消费者的Zone比较。而在过滤之后,这里并不会马上返回过滤的结果,而是通过shouldEnableZoneAffinity函数来判断是否要启用“区域感知”的功能,从下面shouldEnableZoneAffinity的实现中,我们可以看到,它使用了LoadBalancerStatsgetZoneSnapshot方法来获取这些过滤后的同区域实例的基础指标(包含了:实例数量、断路器断开数、活动请求数、实例平均负载等),根据一系列的算法求出下面的几个评价值并与设置的阈值对比(下面的为默认值),若有一个条件符合,就不启用“区域感知”过滤的服务实例清单。这一算法实现对于集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。同时,通过这里的介绍,我们也可以关联着来理解之前介绍Eureka的时候提到的对于区域分配设计来保证跨区域故障的高可用问题。

    • blackOutServerPercentage:故障实例百分比(断路器断开数 / 实例数量) >= 0.8
    • activeReqeustsPerServer:实例平均负载 >= 0.6
    • availableServers:可用实例数(实例数量 - 断路器断开数) < 2
    private boolean shouldEnableZoneAffinity(List<T> filtered) {
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
                || loadPerServer >= activeReqeustsPerServerThreshold.get()
                || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
                new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
        }
    }
    
    • DefaultNIWSServerListFilter:该过滤器完全继承自ZoneAffinityServerListFilter,是默认的NIWS(Netflix Internal Web Service)过滤器。

    • ServerListSubsetFilter:该过滤器也继承自ZoneAffinityServerListFilter,它非常适用于拥有大规模服务器集群(上百或更多)的系统。因为它可以产生一个“区域感知”结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性的从服务实例列表中剔除那些相对不够健康的实例。该过滤器的实现主要分为三步:

      1. 获取“区域感知”的过滤结果,来作为候选的服务实例清单
      2. 从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时也将这些实例从候选清单中剔除,防止第三步的时候又被选入),不够健康的标准如下:
        a. 服务实例的并发连接数超过客户端配置的值,默认为0,配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold
        b. 服务实例的失败数超过客户端配置的值,默认为0,配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold
        c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent。那么就先对剩下的实例列表进行健康排序,再开始从最不健康实例进行剔除,直到达到配置的剔除百分比。
      3. 在完成剔除后,清单已经少了至少10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,而默认的实例子集数量为20,其配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.size
    • ZonePreferenceServerListFilter:Spring Cloud整合时新增的过滤器。若使用Spring Cloud整合Eureka和Ribbon时会默认使用该过滤器。它实现了通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。如下面的源码所示,它的实现非常简单,首先通过父类ZoneAffinityServerListFilter的过滤器来获得“区域感知”的服务实例列表,然后遍历这个结果取出根据消费者配置预设的区域Zone来进行过滤,如果过滤的结果是空的就直接返回父类获取的结果,如果不为空就返回通过消费者配置的Zone过滤后的结果。

    @Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List<Server> local = new ArrayList<Server>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }
    

    ZoneAwareLoadBalancer

    ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在DynamicServerListLoadBalancer中,我们可以看到它并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用在BaseLoadBalancer中实现的算法,使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以它会把所有实例视为一个Zone下的节点来看待,这样就会周期性的产生跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的情况下会有一定的性能问题,而该负载均衡器则可以避免这样的问题。那么它是如何实现的呢?

    首先,在ZoneAwareLoadBalancer中,我们可以发现,它并没有重写setServersList,说明实现服务实例清单的更新主逻辑没有修改。但是我们可以发现它重写了这个函数:setServerListForZones(Map<String, List<Server>> zoneServersMap)。看到这里可能会有一些陌生,因为它并不是接口中定义的必备函数,所以我们不妨去父类DynamicServerListLoadBalancer中寻找一下该函数,我们可以找到下面的定义了:

    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List<T> serverList = (List<T>) lsrv;
        Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
        ...
        setServerListForZones(serversInZones);
    }
    
    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }
    

    setServerListForZones函数的调用位于更新服务实例清单函数setServersList的最后,同时从其实现内容来看,它在父类DynamicServerListLoadBalancer中的作用是根据按区域Zone分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneStatsMap集合中,每一个区域Zone会对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。

    ZoneAwareLoadBalancer中对setServerListForZones的重写如下:

    protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
        }
        for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }
    

    可以看到,在该实现中创建了一个ConcurrentHashMap()类型的balancers对象,它将用来存储每个Zone区域对应的负载均衡器,而具体的负载均衡器的创建则是通过下面的第一个循环中调用getLoadBalancer函数来完成,同时在创建负载均衡器的时候会创建它的规则(如果当前实现中没有IRULE的实例,就创建一个AvailabilityFilteringRule规则;如果已经有具体实例,就clone一个),在创建完负载均衡器后又马上调用setServersList函数为其设置对应Zone区域的实例清单。而第二个循环则是对Zone区域中实例清单的检查,看看是否有Zone区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。

    在了解了该负载均衡器是如何扩展服务实例清单的实现后,我们来具体看看它是如何挑选服务实例,来实现对区域的识别的:

    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            ...
            Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Throwable e) {
            logger.error("Unexpected exception when choosing server using zone aware logic", e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }
    

    从源码中我们可以看到,只有当负载均衡器中维护的实例所属Zone区域个数大于1的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域个数大于1个的时候,它的实现步骤主要如下:

    • 调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats),为当前负载均衡器中所有的Zone区域分别创建快照,保存在Map zoneSnapshot中,这些快照中的数据将用于后续的算法。
    • 调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选。
      • 首先它会剔除符合这些规则的Zone区域:所属实例数为零的Zone区域;Zone区域内实例平均负载小于零,或者实例故障率(断路器断开次数/实例数)大于等于阈值(默认为0.99999)。
      • 然后根据Zone区域的实例平均负载来计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域。
      • 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阈值(默认为20%),就直接返回所有Zone区域为可用区域。否则,从最坏Zone区域集合中随机的选择一个,将它从可用Zone区域集合中剔除。
    • 当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机的选择一个Zone区域。
    • 在确定了某个Zone区域后,则获取对应Zone区域的服务均衡器,并调用chooseServer来选择具体的服务实例,而在chooseServer中将使用IRule接口的choose函数来选择具体的服务实例。在这里IRule接口的实现会使用ZoneAvoidanceRule来挑选出具体的服务实例。

    相关文章

      网友评论

      本文标题:Spring Cloud源码分析(二)Ribbon

      本文链接:https://www.haomeiwen.com/subject/cwjzyttx.html