美文网首页Spring cloud
客户端负载均衡:Spring Cloud Ribbon

客户端负载均衡:Spring Cloud Ribbon

作者: 突突兔007 | 来源:发表于2018-10-10 23:07 被阅读23次

    Spring Cloud Ribbon属于客户端负载均衡,服务端负载均衡和客户端负载均衡最大的不同点在于服务清单所存储的位置。服务端的清单全部来自于服务注册中心。通服务端负载均衡的架构类似,在客户端负载均衡中也需要心跳去维护服务清单的健康性。

    常用的方法

    GET,POST,DELETE,PUT,HEADER,OPTIONS 常用请求如下:

    常用方法
    xxxForEntity(String url, Class<T> responseType, Object... uriVariables)
    xxxForEntity(String url, Class<T> responseType, Map<String, ?> uriVariables)
    xxxForEntity(URI url, Class<T> responseType)
    

    我们可以看到基本上就是这三种访问方式

    1. 第一种url:请求的地址,responseType为请求响应体包装的类型,uriVariables为url中的绑定的参数

    源码分析

    我们可以查看@LoadBalance源码

    /**
     * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
     * @author Spencer Gibb
     */
    @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Qualifier
    public @interface LoadBalanced {
    }
    

    通过查看我们可以知道该注解是用来给RestTemplate做一个标记,然后使用负载均衡器LoadBalancerClient
    查看LoadBalancerClient源码

    /**
     * Represents a client side load balancer
     * @author Spencer Gibb
     */
    public interface LoadBalancerClient extends ServiceInstanceChooser {
            /**
         * 
         * 使用从负载均衡器中挑选出的服务实例来执行
         * @param serviceId the service id to look up the LoadBalancer
         * @param request allows implementations to execute pre and post actions such as
         * incrementing metrics
         * @return the result of the LoadBalancerRequest callback on the selected
         * ServiceInstance
         */
        <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    
        <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
    
        URI reconstructURI(ServiceInstance instance, URI original);
    }
    

    从该接口中,我们可以清楚的知道

    1. 客户端负载均衡器来执行请求内容
    2. 为系统构建一个合适的host:port形式的URI。

    org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration通过内部静态类LoadBalancerInterceptorConfig创建org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,该类被注入了LoadBalancerClient接口。
    我们可以查看LoadBalancerAutoConfiguration类的源码

    /**
     * Auto configuration for Ribbon (client side load balancing).
     *
     * @author Spencer Gibb
     * @author Dave Syer
     * @author Will Tran
     * @author Gang Li
     */
    @Configuration
    @ConditionalOnClass(RestTemplate.class)
    @ConditionalOnBean(LoadBalancerClient.class)
    @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
    public class LoadBalancerAutoConfiguration {
    .....省略........
    }
    

    从该类注释我们可以知道该类为客户端负载均衡器Ribbon提供自动配置,Ribbon实现的负载均衡自动化配置需要满足下面两个条件

    1. @ConditionalOnClass(RestTemplate.class)类路径必须存在于当前路径中
    2. @ConditionalOnBean(LoadBalancerClient.class)在Spring 的Bean工程中必须有LoadBalancerClient的实现Bean。
      在该类中,主要做了下面三件事
    • 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡
    • 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加loadBalancerInterceptor拦截器
    • 维护了一个被@LoadBalance注解修饰过的,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor。
          接下来我们看看LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:
    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    }
    

    通过源码我们知道,当一个被@LoadBalance注解修饰过的RestTemplate对象向外发起Http请求时,会被LoadBalanceInterceptor类的interceptor方法拦截。由于我们在使用RestTemplate时采用了服务名做为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute()方法去根据服务名来选择实例并发起实际的请求。
    分析到这里,LoadBalanceClient还只是一个抽象的负载均衡器接口,我们可以查看具体的实现,来进一步分析负载均衡的策略。通过源码我们查找到 org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient

    public class RibbonLoadBalancerClient implements LoadBalancerClient {
    ...........................省略...............................
    
        @Override
        public ServiceInstance choose(String serviceId) {
            Server server = getServer(serviceId);
            if (server == null) {
                return null;
            }
            return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                    serverIntrospector(serviceId).getMetadata(server));
        }
    
        @Override
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                    serviceId), serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    
        @Override
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if(serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer)serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            catch (IOException ex) {
                statsRecorder.recordStats(ex);
                throw ex;
            }
            catch (Exception ex) {
                statsRecorder.recordStats(ex);
                ReflectionUtils.rethrowRuntimeException(ex);
            }
            return null;
        }
        private ServerIntrospector serverIntrospector(String serviceId) {
            ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
                    ServerIntrospector.class);
            if (serverIntrospector == null) {
                serverIntrospector = new DefaultServerIntrospector();
            }
            return serverIntrospector;
        }
    
        private boolean isSecure(Server server, String serviceId) {
            IClientConfig config = this.clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            return RibbonUtils.isSecure(config, serverIntrospector, server);
        }
    
        protected Server getServer(String serviceId) {
            return getServer(getLoadBalancer(serviceId));
        }
        protected Server getServer(ILoadBalancer loadBalancer) {
            if (loadBalancer == null) {
                return null;
            }
            return loadBalancer.chooseServer("default"); // TODO: better handling of key
        }
    
        protected ILoadBalancer getLoadBalancer(String serviceId) {
            return this.clientFactory.getLoadBalancer(serviceId);
        }
        public static class RibbonServer implements ServiceInstance {
            private final String serviceId;
            private final Server server;
            private final boolean secure;
            private Map<String, String> metadata;
        }
       ...........................省略...............................
    }
    

    可以看到execute()方法中的getServer(loadBalancer)方法,从负载均衡器中获取服务,返回Server对象,

    protected Server getServer(ILoadBalancer loadBalancer) {
        if (loadBalancer == null) {
            return null;
        }
        return loadBalancer.chooseServer("default"); // TODO: better handling of key
    }
    

    可以看到getServer()方法传入的是ILoadBalancer 接口,没有使用LoadBalanceClient接口中的choose()方法,我们可以看看ILoadBalance接口的相关类图


    BaseLoadBalancer实现了基础的负载均衡,我们在此类中可以看到,如果我们不配置自己的负载均衡策略的话,默认使用了RoundRobinRule()实现轮训负载均衡。而DynamicServerListLoadBalancerZoneAwareLoadBalancer在负载均衡的策略上做了一些功能的扩展。

    getServer()方法返回的Server对象定义了一些传统的服务端节点,该类中存储了服务端节点的元数据,包括host ,port等。如下:

    public class Server {
    
        /**
         * Additional meta information of a server, which contains
         * information of the targeting application, as well as server identification
         * specific for a deployment environment, for example, AWS.
         */
        public static interface MetaInfo {
            /**
             * @return the name of application that runs on this server, null if not available
             */
            public String getAppName();
    
            /**
             * @return the group of the server, for example, auto scaling group ID in AWS.
             * Null if not available
             */
            public String getServerGroup();
    
            /**
             * @return A virtual address used by the server to register with discovery service.
             * Null if not available
             */
            public String getServiceIdForDiscovery();
    
            /**
             * @return ID of the server
             */
            public String getInstanceId();
        }
    
        public static final String UNKNOWN_ZONE = "UNKNOWN";
        private String host;
        private int port = 80;
        private String scheme;
        private volatile String id;
        private volatile boolean isAliveFlag;
        private String zone = UNKNOWN_ZONE;
        private volatile boolean readyToServe = true;
    
        private MetaInfo simpleMetaInfo = new MetaInfo() {
           ...省略get set...
        };
    

    那么我们在整合Ribbon的时候,Spring Cloud默认采用了那个具体的实现,我们可以通过RibbonClientConfiguration配置类,可以知道在默认时采用了ZoneAwareLoadBalancer来实现负载均衡器。

    @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
    

    我们可以看到在返回的对象中有个rule参数,我们可以一直追踪下去,直到看到如下代码

     /* Ignore null rules */
        public void setRule(IRule rule) {
            if (rule != null) {
                this.rule = rule;
            } else {
                /* default rule */
                this.rule = new RoundRobinRule();
            }
            if (this.rule.getLoadBalancer() != this) {
                this.rule.setLoadBalancer(this);
            }
        }
    

    这表示负载均衡器默认使用RoundRobinRule()规则,也就是我们所说的轮询(轮询的原理大家可以看源码或者自行百度,很简单)。
    我们继续看RibbonLoadBalancerClient的execute方法,

    在通过getServer(loadBalancer)的时候,也就是ZoneAwareLoadBalancer的choose()方法获取了负载均衡策略分配到的服务实例对象Server后,将其包装成RibbonServer对象(该对象除了存储了服务实例的信息之外),然后使用该对象在回调LoadBalanceInterceptor请求拦截器中LoadBalanceRequest的apply(final ServiceInstance instance)函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求到host:port形式的实际访问地址的转换。

    负载均衡器

    Spring Cloud 中使用LoadBalanceClient作为负载均衡器的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是他在具体实现客户端负载均衡时,时通过Ribbon的ILoadBalancer接口实现的,下面我们根据ILoadBalancer接口的实现类逐个看看它是如何实现客户端负载均衡的。


    com.netflix.loadbalancer.AbstractLoadBalancer

    com.netflix.loadbalancer.BaseLoadBalancer

    BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义了很多有关负载均衡器相关的内容。

    • 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务列表,一个用于存储正常服务的实例清单。
        @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> allServerList = Collections
                .synchronizedList(new ArrayList<Server>());
        @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> upServerList = Collections
                .synchronizedList(new ArrayList<Server>());
    
    • 定义了之前我们提到的用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。
    • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。
    • 定义了检查服务实例操作的执行策略对象IPingStrategy接口,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用了顺序遍历ping服务实例的方法实现检查。该策略在当IPing的实现速度慢,或者Server列表过大时,可能会影响系统性能,这时候需要通过实现IPingStrategy接口并重写pingServer(IPing ping,Server[] servers)函数去扩展ping的执行策略。
        /**
         * Default implementation for <c>IPingStrategy</c>, performs ping
         * serially, which may not be desirable, if your <c>IPing</c>
         * implementation is slow, or you have large number of servers.
         */
        private static class SerialPingStrategy implements IPingStrategy {
    
            @Override
            public boolean[] pingServers(IPing ping, Server[] servers) {
                int numCandidates = servers.length;
                boolean[] results = new boolean[numCandidates];
    
                logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);
    
                for (int i = 0; i < numCandidates; i++) {
                    results[i] = false; /* Default answer is DEAD. */
                    try {
                        // NOTE: IFF we were doing a real ping
                        // assuming we had a large set of servers (say 15)
                        // the logic below will run them serially
                        // hence taking 15 times the amount of time it takes
                        // to ping each server
                        // A better method would be to put this in an executor
                        // pool
                        // But, at the time of this writing, we dont REALLY
                        // use a Real Ping (its mostly in memory eureka call)
                        // hence we can afford to simplify this design and run
                        // this
                        // serially
                        if (ping != null) {
                            results[i] = ping.isAlive(servers[i]);
                        }
                    } catch (Exception e) {
                        logger.error("Exception while pinging Server: '{}'", servers[i], e);
                    }
                }
                return results;
            }
        }
    
    • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancer中chooseServer(Object key)的实现源码,我们可以知道,负载均衡器实际将服务实例委托给了IRule实例中的choose函数。而在这里,默认初始化了RoundRobbinRule为IRule的实现对象。RoundRobbinRule实现了最基本的线性负载均衡规则。
    public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                log.warn("no load balancer");
                return null;
            }
    
            Server server = null;
            int count = 0;
            while (server == null && count++ < 10) {
                List<Server> reachableServers = lb.getReachableServers();
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
    
                if ((upCount == 0) || (serverCount == 0)) {
                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }
    
                int nextServerIndex = incrementAndGetModulo(serverCount);
                server = allServers.get(nextServerIndex);
    
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive() && (server.isReadyToServe())) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
    
            if (count >= 10) {
                log.warn("No available alive servers after 10 tries from load balancer: "
                        + lb);
            }
            return server;
        }
    
    • 启动ping 任务:在BaseLoadBalancer的默认构造函数中,会直接启动给一个用于定时检测Server是否健康的服务。该任务时间间隔为10s
     void setupPingTask() {
            if (canSkipPing()) {
                return;
            }
            if (lbTimer != null) {
                lbTimer.cancel();
            }
            lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                    true);
            lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
            forceQuickPing();
        }
    

    我们继续追踪源码可以看到,

     /**
         * TimerTask that keeps runs every X seconds to check the status of each
         * server/node in the Server List
         * 
         * @author stonse
         * 
         */
        class PingTask extends TimerTask {
            public void run() {
                try {
                    new Pinger(pingStrategy).runPinger();
                } catch (Exception e) {
                    logger.error("LoadBalancer [{}]: Error pinging", name, e);
                }
            }
        }
    

    负载均衡策略

    我们解读一下IRule接口。


    com.netflix.loadbalancer.AbstractLoadBalancerRule

    该类中定义了负载均衡器的抽象接口ILoadBalancer对象,该对象在具体选择服务策略时,获取到一写负载均衡器中维护的信息作为分配依据,并以此设计一些算法来实现特定场景的高效策略。

    /**
     * Class that provides a default implementation for setting and getting load balancer
     * @author stonse
     *
     */
    public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
    
        private ILoadBalancer lb;
            
        @Override
        public void setLoadBalancer(ILoadBalancer lb){
            this.lb = lb;
        }
        
        @Override
        public ILoadBalancer getLoadBalancer(){
            return lb;
        }      
    }
    

    如果我们不满足系统默认的集中实现,也可以自定义ILoadBalancer,很简单,如下操作,自定义实现的原理后面再说。

    /**
     * 在这里自定义自己的LoadBalancer
     * 如果需要使用自定义的LoadBalancer,在此类上加上@Configuration,
     * 返回的ILoadBalancer是实现了ILoadBalancer接口的实现类即可
     */
    @Configuration
    public class LoadBalancerConfiguration {
    
        @Bean
        public ILoadBalancer myLoadBalancer(){
            return new TestLoadBalnacer();
        }
    
        class TestLoadBalnacer implements ILoadBalancer{
          ......省略......
        }
    }
    

    com.netflix.loadbalancer.RandomRule

    /**
     * A loadbalacing strategy that randomly distributes traffic amongst existing
     * servers.
     * 
     * @author stonse
     * 
     */
    public class RandomRule extends AbstractLoadBalancerRule
    

    从该类的命名和注释我们可以知道该类是一个负载均衡策略,一种从服务实例清单中随机选择一个服务实例的功能。我们可以看到该类实现了抽象类的choose(Object key)方法,委托给了该类中的choose(ILoadBalancer lb, Object key) 方法,该对象多了一个负载均衡器参数ILoadBalancer 。该负载均衡器对象提供一个可用的服务实例列表lb.getReachableServers()和所有服务实例列表lb.getAllServers(),通过
    int index = rand.nextInt(serverCount)获得一个随机数,通过随机数在获取服务Server server = upList.get(index)

    /**
         * Randomly choose from all living servers
         */
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                return null;
            }
            Server server = null;
            while (server == null) {
                if (Thread.interrupted()) {
                    return null;
                }
                List<Server> upList = lb.getReachableServers();
                List<Server> allList = lb.getAllServers();
    
                int serverCount = allList.size();
                if (serverCount == 0) {
                    /*
                     * No servers. End regardless of pass, because subsequent passes
                     * only get more restrictive.
                     */
                    return null;
                }
                int index = rand.nextInt(serverCount);
                server = upList.get(index);
    
                if (server == null) {
                    /*
                     * The only time this should happen is if the server list were
                     * somehow trimmed. This is a transient condition. Retry after
                     * yielding.
                     */
                    Thread.yield();
                    continue;
                }
                if (server.isAlive()) {
                    return (server);
                }
                // Shouldn't actually happen.. but must be transient or a bug.
                server = null;
                Thread.yield();
            }
            return server;
        }
    

    com.netflix.loadbalancer.RoundRobinRule

    最广为人知和最基本的负载平衡策略,即轮询规则。该策略实现了按照线性轮询的方式依次选择每个服务实例的功能。

    /**
     * The most well known and basic load balancing strategy, i.e. Round Robin Rule.
     *
     * @author stonse
     * @author Nikos Michalakis <nikos@netflix.com>
     *
     */
    public class RoundRobinRule extends AbstractLoadBalancerRule {
    /**
         * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
         *
         * @param modulo The modulo to bound the value of the counter.
         * @return The next value.
         */
        private int incrementAndGetModulo(int modulo) {
            for (;;) {
                int current = nextServerCyclicCounter.get();
                int next = (current + 1) % modulo;
                if (nextServerCyclicCounter.compareAndSet(current, next))
                    return next;
            }
        }
        public Server choose(ILoadBalancer lb, Object key) {
            if (lb == null) {
                log.warn("no load balancer");
                return null;
            }
    
            Server server = null;
            int count = 0;
            while (server == null && count++ < 10) {
                List<Server> reachableServers = lb.getReachableServers();
                List<Server> allServers = lb.getAllServers();
                int upCount = reachableServers.size();
                int serverCount = allServers.size();
    
                if ((upCount == 0) || (serverCount == 0)) {
                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }
    
                int nextServerIndex = incrementAndGetModulo(serverCount);
                server = allServers.get(nextServerIndex);
    
                if (server == null) {
                    /* Transient. */
                    Thread.yield();
                    continue;
                }
    
                if (server.isAlive() && (server.isReadyToServe())) {
                    return (server);
                }
    
                // Next.
                server = null;
            }
    
            if (count >= 10) {
                log.warn("No available alive servers after 10 tries from load balancer: "
                        + lb);
            }
            return server;
        }
    
    }
    

    可以看到这里实现的方式和RandomRule很类似,区别在于选择服务的逻辑上。这里采用轮询方式选择服务,轮询的原理这里就不在赘述了。不清楚的,百度一下,这里给出相关代码,自己领悟


    轮询关键代码

    com.netflix.loadbalancer.RetryRule

    带有重试机制的实例选择功能的负载均衡策略。
    关键代码如下:

    /*
         * Loop if necessary. Note that the time CAN be exceeded depending on the
         * subRule, because we're not spawning additional threads and returning
         * early.
         */
        public Server choose(ILoadBalancer lb, Object key) {
            long requestTime = System.currentTimeMillis();
            long deadline = requestTime + maxRetryMillis;
    
            Server answer = null;
    
            answer = subRule.choose(key);
    
            if (((answer == null) || (!answer.isAlive()))
                    && (System.currentTimeMillis() < deadline)) {
    
                InterruptTask task = new InterruptTask(deadline
                        - System.currentTimeMillis());
    
                while (!Thread.interrupted()) {
                    answer = subRule.choose(key);
    
                    if (((answer == null) || (!answer.isAlive()))
                            && (System.currentTimeMillis() < deadline)) {
                        /* pause and retry hoping it's transient */
                        Thread.yield();
                    } else {
                        break;
                    }
                }
    
                task.cancel();
            }
    
            if ((answer == null) || (!answer.isAlive())) {
                return null;
            } else {
                return answer;
            }
        }
    

    com.netflix.loadbalancer.WeightedResponseTimeRule

    该策略集成RoundRobinRule,增加了根据实例的运行情况来计算权重,并根据权重来选择实例,已达到最佳的选择效果。有兴趣的读者自己可以查看源码。这里不做过多分析。

    com.netflix.loadbalancer.RetryRule

    com.netflix.loadbalancer.ClientConfigEnabledRoundRobinRule

    /**
     * This class essentially contains the RoundRobinRule class defined in the
     * loadbalancer package
     * 
     * @author stonse
     * 
     */
    public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
    
        RoundRobinRule roundRobinRule = new RoundRobinRule();
    
        @Override
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            roundRobinRule = new RoundRobinRule();
        }
    
        @Override
        public void setLoadBalancer(ILoadBalancer lb) {
            super.setLoadBalancer(lb);
            roundRobinRule.setLoadBalancer(lb);
        }
        
        @Override
        public Server choose(Object key) {
            if (roundRobinRule != null) {
                return roundRobinRule.choose(key);
            } else {
                throw new IllegalArgumentException(
                        "This class has not been initialized with the RoundRobinRule class");
            }
        }
    
    }
    

    该类内部定义了RoundRobinRule策略,内部的choose()函数也是使用了RoundRobinRule 的choose()函数实现的线性轮询,所以该类实现的功能实际和RoundRobinRule一样。一般情况下,我们会继承该类,在子类中做一些高级策略时通常有可能会存在一些无法实施的情况,那么久可以用父类的实现,这里只的就是该类的choose()函数,

    com.netflix.loadbalancer.BestAvailableRule

    该策略继承ClientConfigEnabledRoundRobinRule ,在实现中注入负载均衡器统计对象LoadBalancerStats ,在该类的choose()函数中,我们可以发现,if (loadBalancerStats == null) { return super.choose(key);},这就是继承ClientConfigEnabledRoundRobinRule 的用处所在。我们可以看到该策略拿到负载均衡器中的所有服务实例,然后过滤掉有故障的实例,在找出并发数最小的服务实例。所以该策略是找出服务器中最空闲的服务实例。

    /**
     * A rule that skips servers with "tripped" circuit breaker and picks the
     * server with lowest concurrent requests.
     * <p>
     * This rule should typically work with {@link ServerListSubsetFilter} which puts a limit on the 
     * servers that is visible to the rule. This ensure that it only needs to find the minimal 
     * concurrent requests among a small number of servers. Also, each client will get a random list of 
     * servers which avoids the problem that one server with the lowest concurrent requests is 
     * chosen by a large number of clients and immediately gets overwhelmed.
     * 
     * @author awang
     *
     */
    public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
        private LoadBalancerStats loadBalancerStats;
        @Override
        public Server choose(Object key) {
            if (loadBalancerStats == null) {
                return super.choose(key);
            }
            List<Server> serverList = getLoadBalancer().getAllServers();
            int minimalConcurrentConnections = Integer.MAX_VALUE;
            long currentTime = System.currentTimeMillis();
            Server chosen = null;
            for (Server server: serverList) {
                ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
                if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                    int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                    if (concurrentConnections < minimalConcurrentConnections) {
                        minimalConcurrentConnections = concurrentConnections;
                        chosen = server;
                    }
                }
            }
            if (chosen == null) {
                return super.choose(key);
            } else {
                return chosen;
            }
        }
        @Override
        public void setLoadBalancer(ILoadBalancer lb) {
            super.setLoadBalancer(lb);
            if (lb instanceof AbstractLoadBalancer) {
                loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();            
            }
        }
    }
    

    com.netflix.loadbalancer.PredicateBasedRule

    可以看到该类是一个抽象类,里面包含抽象方法public abstract AbstractServerPredicate getPredicate();
    该类通过子类实现父类的抽象方法来提供过滤功能,然后在以线性轮询的方式从过滤后的清单中选出一个实例。所以至于该类如何过滤完全有子类实现。

    com.netflix.loadbalancer.AvailabilityFilteringRule

    可以看到该类继承了PredicateBasedRule 类并实现了抽象方法完成过滤功能。其中过滤逻辑如下:

        @Override
        public boolean apply(@Nullable PredicateKey input) {
            LoadBalancerStats stats = getLBStats();
            if (stats == null) {
                return true;
            }
            return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
        }
        private boolean shouldSkipServer(ServerStats stats) {        
            if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                    || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
                return true;
            }
            return false;
        }
    

    在choose方法中,我们可以看到,该策略先线性轮询先选一个,在判断选出的实例是否满足条件(AvailabilityPredicate的apply()方法)来选出可用且比较空闲的实例。
    通过查看源码我们知道,该逻辑判断了当前服务的断路器是否生效打开,实例的并发请求书是否大于Integer.MAX_VALUE也就是231-1,如果有一项满足条件,就便是该节点可能存在故障活着负载过高,所以apply方法返回false,在父类的choose()方法中,如果方法false,就累加计数器,如果计数器大于等于10,就调用父类线性轮询。

    public class AvailabilityFilteringRule extends PredicateBasedRule {    
    
        private AbstractServerPredicate predicate;
        
        public AvailabilityFilteringRule() {
            super();
            predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                    .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                    .build();
        }
        
        
        @Override
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
                        .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                        .build();
        }
    
        @Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
        public int getAvailableServersCount() {
            ILoadBalancer lb = getLoadBalancer();
            List<Server> servers = lb.getAllServers();
            if (servers == null) {
                return 0;
            }
            return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
        }
    
    
        /**
         * This method is overridden to provide a more efficient implementation which does not iterate through
         * all servers. This is under the assumption that in most cases, there are more available instances 
         * than not. 
         */
        @Override
        public Server choose(Object key) {
            int count = 0;
            Server server = roundRobinRule.choose(key);
            while (count++ <= 10) {
                if (predicate.apply(new PredicateKey(server))) {
                    return server;
                }
                server = roundRobinRule.choose(key);
            }
            return super.choose(key);
        }
    
        @Override
        public AbstractServerPredicate getPredicate() {
            return predicate;
        }
    }
    

    com.netflix.loadbalancer.ZoneAvoidanceRule

    该策略模式使用了com.netflix.loadbalancer.CompositePredicate来过滤服务实例清单,这是一个组合顾虑条件,如图

    复合过滤器
    主滤条件ZoneAvoidancePredicate
    次过滤条件AvailabilityPredicate,查看CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();当中的addFallbackPredicate(p2)方法,我们发现次方法返回的是一个Builder.List<AbstractServerPredicate>,所以我们断定次过滤条件可用有多个且是按顺序执行的。

    自动化配置

    在引入Spring CloudRibbon之后,能够自动化构建下面的接口。

    • IClientConfig:Ribbon的客户端,默认采用com.netflix.client.config.DefaultClientConfigImpl实现。
    • IRule:负载均衡器默认实现ZoneAvoidanceRule,该策略能够在多区域环境下选出最佳的实例进行访问。
    • ServerList<Server>:服务实例清单维护机制,默认实现ConfigurationBasedServerList。
      -ILoadBalancer:负载均衡器,默认采用ZoneAwareLoadBalancer实现,具备区域感知能力。
      上面的自动化配置仅在没有引入Spring Cloud Eureka等服务治理框架的时候如此,在同时引入Eureka和Ribbon依赖时,自动化会有写不同。

               通过自动化配置的实现。我们可以轻松的实现客户端的负载均衡,同时我们也可以替换这些默认实现。只需要在Spring Boot应用中创建对应的实现类覆盖默认的实现方法即可。比如下面的内容,

    /**
     * 在这里自定义自己的LoadBalancer
     * 如果需要使用自定义的LoadBalancer,在此类上加上@Configuration,
     * 返回的ILoadBalancer是实现了ILoadBalancer接口的实现类即可
     */
    @Configuration
    public class LoadBalancerConfiguration {
    
        @Bean
        public ILoadBalancer myLoadBalancer(){
            return new TestLoadBalnacer();
        }
    }
    

    另外也可以使用@RibbonClient注解来实现更细粒度的客户端配置,比如下面的代码实现了为hello-service服务使用HelloServiceConfiguration中的配置

    @Configuration
    @RibbonClient(name = "hello-service",configuration = HelloServiceConfiguration.class)
    public class RibbonConfiguration {
    }
    

    相关文章

      网友评论

        本文标题:客户端负载均衡:Spring Cloud Ribbon

        本文链接:https://www.haomeiwen.com/subject/acpznftx.html