美文网首页
ribbon源码解析

ribbon源码解析

作者: 拥抱孤独_to | 来源:发表于2020-08-03 10:33 被阅读0次

    ribbon的使用非常简单,只需要在spring容器中注入一个带有@LoadBalanced注解的RestTmplate就能直接使用

    
        @LoadBalanced
        @Bean
        public RestTemplate restTemplate() {
            return new RestTemplate();
        }
    

    这里先编写一个服务,供client端调用

    @RestController
    public class RibbonService {
    
        @Value("${server.port}")
        private String port;
    
        @GetMapping("/info")
        public String nameAndPort() {
            return "ribbon-server : "+port;
        }
    }
    

    server端application.properties配置

    spring.application.name=ribbon-server
    server.port=8081
    

    client端直接调用

    @RestController
    public class ClientController {
        @Autowired
        RestTemplate restTemplate;
    
        @GetMapping("/test")
        public String test() {
            return restTemplate.getForObject("http://ribbon-server/info",String.class);
        }
    }
    

    client端application.properties配置,这里我们配置了服务端的地址

    server.port=8080
    spring.application.name=ribbon-client
    ribbon-server.ribbon.listOfServers=localhost:8081
    

    之后访问本地/test接口,调用成功。


    image.png

    上面简单的配置就能使用RestTmplate来进行服务间的调用,这里配上ribbon的其他配置

    # 禁用 Eureka
    ribbon.eureka.enabled=false
    #这个配置是针对具体服务的,前缀就是服务名称
    <server-application-name>.ribbon.listOfServers=localhost:8081,localhost:8083
    # 请求连接的超时时间
    ribbon.ConnectTimeout=2000
    # 请求处理的超时时间
    ribbon.ReadTimeout=5000
    也可以为每个Ribbon客户端设置不同的超时时间, 通过服务名称进行指定:
    <server-application-name>.ribbon.ConnectTimeout=2000
    <server-application-name>.ribbon.ReadTimeout=5000
    # 最大连接数
    ribbon.MaxTotalConnections=500
    # 每个host最大连接数
    ribbon.MaxConnectionsPerHost=500
    # ribbon服务列表刷新时间,默认刷新间隔30s
    ribbon.ServerListRefreshInterval=30000
    # 服务负载均衡器操作接口 需要实现 ILoadBalancer
    <server-application-name>.ribbon.NFLoadBalancerClassName=
    # 服务负载均衡算法 需要实现 IRule
    <server-application-name>.ribbon.NFLoadBalancerRuleClassName=
    # 服务可用性检查 需要实现 IPing
    <server-application-name>.ribbon.NFLoadBalancerPingClassName=
    # 服务列表获取 需要实现 ServerList
    <server-application-name>.ribbon.NIWSServerListClassName=
    # 服务列表的过滤 需要实现 ServerList­Filter
    <server-application-name>.ribbon.NIWSServerListFilterClassName=
    

    其中ribbon自带的负载均衡策略有如下

    1. BestAvailabl
      选择一个最小的并发请求的 Server,逐个考察 Server,如果 Server 被标记为错误,则跳过,然后再选择 ActiveRequestCount 中最小的 Server。
    2. AvailabilityFilteringRule
      过滤掉那些一直连接失败的且被标记为 circuit tripped 的后端 Server,并过滤掉那些高并发的后端 Server 或者使用一个 AvailabilityPredicate 来包含过滤 Server 的逻辑。其实就是检查 Status 里记录的各个 Server 的运行状态。
    3. ZoneAvoidanceRule
      使用 ZoneAvoidancePredicate 和 AvailabilityPredicate 来判断是否选择某个 Server,前一个判断判定一个 Zone 的运行性能是否可用,剔除不可用的 Zone(的所有 Server),AvailabilityPredicate 用于过滤掉连接数过多的 Server。
    4. RandomRule
      随机选择一个 Server。
    5. RoundRobinRule
      轮询选择,轮询 index,选择 index 对应位置的 Server。
    6. RetryRule
      对选定的负载均衡策略机上重试机制,也就是说当选定了某个策略进行请求负载时在一个配置时间段内若选择 Server 不成功,则一直尝试使用 subRule 的方式选择一个可用的 Server。
    7. WeightedResponseTimeRule
      根据响应时间分配一个 Weight(权重),响应时间越长,Weight 越小,被选中的可能性越低。

    接下来,我们看下为什么通过服务名并且加上@LoadBalanced注解就能实现对服务的调用并且还能实现负载均衡的功能

    image.png
    springboot启动会加载META-INF/spring.factories文件,将key为EnableAutoConfiguration的类都装配到spring容器中,所以ribbon的入口就是RibbonAutoConfiguration这个类
    在这个类中,就是装载了一些bean,其中我们只需要看LoadBalancerAutoConfiguration这个类
    @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
    
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
    

    这里我们可以看到,此处注入所有带了注解@LoadBalanced的RestTemplate类。

    @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Qualifier
    public @interface LoadBalanced {
    }
    

    这里有一点须知,@LoadBalanced注解实际上什么都没有,就是一个@Qualifier,而这个@Qualifier可以认为是一个标记,标识只将带有该标记的类注入进来。

        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        customizer.customize(restTemplate);
                    }
                }
            });
        }
        @Configuration
        @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
        static class LoadBalancerInterceptorConfig {
            @Bean
            public LoadBalancerInterceptor ribbonInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
        }
    

    这里我们可以看到,这个类就是给这些RestTmplate添加了LoadBalancerInterceptor拦截器。

    @Override
            public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
                if (this.iterator.hasNext()) {
                    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                    return nextInterceptor.intercept(request, body, this);
                }
                else {
                    HttpMethod method = request.getMethod();
                    Assert.state(method != null, "No standard HTTP method");
                    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                    request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                    if (body.length > 0) {
                        if (delegate instanceof StreamingHttpOutputMessage) {
                            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                            streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                        }
                        else {
                            StreamUtils.copy(body, delegate.getBody());
                        }
                    }
                    return delegate.execute();
                }
            }
    

    而这里RestTmplate.getForObject方法最终会进入到execute这个方法,所有最终会到LoadBalancerInterceptor类,我们继续看LoadBalancerInterceptor.intercept方法

        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    

    此处loadBalancer就是RibbonLoadBalancerClient,他来自于RibbonAutoConfiguration中的装配,serviceName就是调用的服务命,这里就是主机地址,也就是ribbon-server

        @Bean
        @ConditionalOnMissingBean(LoadBalancerClient.class)
        public LoadBalancerClient loadBalancerClient() {
            return new RibbonLoadBalancerClient(springClientFactory());
        }
    

    继续进入RibbonLoadBalancerClient.execute(String serviceId, LoadBalancerRequest<T> request)

        @Override
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            #获取负载均衡器
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                    serviceId), serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    

    最终来到

        @Override
        public <C> C getInstance(String name, Class<C> type) {
            C instance = super.getInstance(name, type);
            if (instance != null) {
                return instance;
            }
            IClientConfig config = getInstance(name, IClientConfig.class);
            return instantiateWithConfig(getContext(name), type, config);
        }
    
        public <T> T getInstance(String name, Class<T> type) {
            AnnotationConfigApplicationContext context = getContext(name);
            if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                    type).length > 0) {
                return context.getBean(type);
            }
            return null;
        }
    
        protected AnnotationConfigApplicationContext getContext(String name) {
            if (!this.contexts.containsKey(name)) {
                synchronized (this.contexts) {
                    if (!this.contexts.containsKey(name)) {
                        this.contexts.put(name, createContext(name));
                    }
                }
            }
            return this.contexts.get(name);
        }
    
        protected AnnotationConfigApplicationContext createContext(String name) {
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
            if (this.configurations.containsKey(name)) {
                for (Class<?> configuration : this.configurations.get(name)
                        .getConfiguration()) {
                    context.register(configuration);
                }
            }
            for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
                if (entry.getKey().startsWith("default.")) {
                    for (Class<?> configuration : entry.getValue().getConfiguration()) {
                        context.register(configuration);
                    }
                }
            }
            context.register(PropertyPlaceholderAutoConfiguration.class,
                    this.defaultConfigType);
            context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
                    this.propertySourceName,
                    Collections.<String, Object> singletonMap(this.propertyName, name)));
            if (this.parent != null) {
                // Uses Environment from parent as well as beans
                context.setParent(this.parent);
            }
            context.setDisplayName(generateDisplayName(name));
            context.refresh();
            return context;
        }
    

    这里可以先明确一下三个字段的值,defaultConfigType,propertySourceName,propertyName分别为RibbonClientConfiguration.class,ribbon,ribbon.client.name

    public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
    
        static final String NAMESPACE = "ribbon";
    
        public SpringClientFactory() {
            super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
        }
    }
    
        public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName,
                String propertyName) {
            this.defaultConfigType = defaultConfigType;
            this.propertySourceName = propertySourceName;
            this.propertyName = propertyName;
        }
    

    而createContext方法做的主要几个事就是,在spring容器中装配了两个类PropertyPlaceholderAutoConfiguration,RibbonClientConfiguration,并且往spring环境中加了一个配置ribbon.client.name=ribbon-server。
    为什么要这样做呢,在来看下RibbonClientConfiguration这个类

    @Configuration
    @EnableConfigurationProperties
    //Order is important here, last should be the default, first should be optional
    // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
    @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
    public class RibbonClientConfiguration {
    
        public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
        public static final int DEFAULT_READ_TIMEOUT = 1000;
    
        @RibbonClientName
        private String name = "client";
    
        // TODO: maybe re-instate autowired load balancers: identified by name they could be
        // associated with ribbon clients
    
        @Autowired
        private PropertiesFactory propertiesFactory;
    
        @Bean
        @ConditionalOnMissingBean
        public IClientConfig ribbonClientConfig() {
            DefaultClientConfigImpl config = new DefaultClientConfigImpl();
            config.loadProperties(this.name);
            config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
            config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
            return config;
        }
    
        @Bean
        @ConditionalOnMissingBean
        public IRule ribbonRule(IClientConfig config) {
            if (this.propertiesFactory.isSet(IRule.class, name)) {
                return this.propertiesFactory.get(IRule.class, config, name);
            }
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }
    
        @Bean
        @ConditionalOnMissingBean
        public IPing ribbonPing(IClientConfig config) {
            if (this.propertiesFactory.isSet(IPing.class, name)) {
                return this.propertiesFactory.get(IPing.class, config, name);
            }
            return new DummyPing();
        }
    
        @Bean
        @ConditionalOnMissingBean
        @SuppressWarnings("unchecked")
        public ServerList<Server> ribbonServerList(IClientConfig config) {
            if (this.propertiesFactory.isSet(ServerList.class, name)) {
                return this.propertiesFactory.get(ServerList.class, config, name);
            }
            ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
            serverList.initWithNiwsConfig(config);
            return serverList;
        }
    
        @Bean
        @ConditionalOnMissingBean
        public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
            return new PollingServerListUpdater(config);
        }
    
        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
    
        @Bean
        @ConditionalOnMissingBean
        @SuppressWarnings("unchecked")
        public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
            if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
                return this.propertiesFactory.get(ServerListFilter.class, config, name);
            }
            ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
            filter.initWithNiwsConfig(config);
            return filter;
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
                                                                   IClientConfig config, RetryHandler retryHandler) {
            return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
        }
    
        @Bean
        @ConditionalOnMissingBean
        public RetryHandler retryHandler(IClientConfig config) {
            return new DefaultLoadBalancerRetryHandler(config);
        }
    
        @Bean
        @ConditionalOnMissingBean
        public ServerIntrospector serverIntrospector() {
            return new DefaultServerIntrospector();
        }
    
        @PostConstruct
        public void preprocess() {
            setRibbonProperty(name, DeploymentContextBasedVipAddresses.key(), name);
        }
    
        static class OverrideRestClient extends RestClient {
    
            private IClientConfig config;
            private ServerIntrospector serverIntrospector;
    
            protected OverrideRestClient(IClientConfig config,
                    ServerIntrospector serverIntrospector) {
                super();
                this.config = config;
                this.serverIntrospector = serverIntrospector;
                initWithNiwsConfig(this.config);
            }
    
            @Override
            public URI reconstructURIWithServer(Server server, URI original) {
                URI uri = updateToSecureConnectionIfNeeded(original, this.config,
                        this.serverIntrospector, server);
                return super.reconstructURIWithServer(server, uri);
            }
    
            @Override
            protected Client apacheHttpClientSpecificInitialization() {
                ApacheHttpClient4 apache = (ApacheHttpClient4) super.apacheHttpClientSpecificInitialization();
                apache.getClientHandler().getHttpClient().getParams().setParameter(
                        ClientPNames.COOKIE_POLICY, CookiePolicy.IGNORE_COOKIES);
                return apache;
            }
    
        }
    
    }
    

    原来他是将RibbonClientConfiguration注册到容器中为之后容器能加在该类中的bean,而且我们看到一个name,而这个name就是通过之前注入的属性ribbon.client.name=ribbon-server最后注入到该字段上

    @Target({ ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
            ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Value("${ribbon.client.name}")
    public @interface RibbonClientName {
    
    }
    

    并且ZoneAwareLoadBalancer初始化的时候,就把我们配置的ribbon-server.ribbon.listOfServers封装了成Server对象

        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping);
            this.isSecure = false;
            this.useTunnel = false;
            this.serverListUpdateInProgress = new AtomicBoolean(false);
            this.updateAction = new NamelessClass_1();
            this.serverListImpl = serverList;
            this.filter = filter;
            this.serverListUpdater = serverListUpdater;
            if (filter instanceof AbstractServerListFilter) {
                ((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
            }
    
            this.restOfInit(clientConfig);
        }
    
        void restOfInit(IClientConfig clientConfig) {
            boolean primeConnection = this.isEnablePrimingConnections();
            this.setEnablePrimingConnections(false);
            this.enableAndInitLearnNewServersFeature();
            this.updateListOfServers();
            if (primeConnection && this.getPrimeConnections() != null) {
                this.getPrimeConnections().primeConnections(this.getReachableServers());
            }
    
            this.setEnablePrimingConnections(primeConnection);
            LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
        }
    
        @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList();
            if (this.serverListImpl != null) {
                servers = this.serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
                if (this.filter != null) {
                    servers = this.filter.getFilteredListOfServers((List)servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
                }
            }
    
            this.updateAllServerList((List)servers);
        }
    
    

    到这里,ILoadBalancer loadBalancer = getLoadBalancer(serviceId)此处的ILoadBalancer就是ZoneAwareLoadBalancer

    
        public Server chooseServer(Object key) {
            if (this.counter == null) {
                this.counter = this.createCounter();
            }
    
            this.counter.increment();
            if (this.rule == null) {
                return null;
            } else {
                try {
                    return this.rule.choose(key);
                } catch (Exception var3) {
                    logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", new Object[]{this.name, key, var3});
                    return null;
                }
            }
        }
    

    最终又会调用Irule路由规则获取到一个具体的Server,而IRule如果没有配置就是默认的RoundRobinRule轮询策略。

        @Override
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if(serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer)serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            catch (IOException ex) {
                statsRecorder.recordStats(ex);
                throw ex;
            }
            catch (Exception ex) {
                statsRecorder.recordStats(ex);
                ReflectionUtils.rethrowRuntimeException(ex);
            }
            return null;
        }
    

    最终又会调用request.apply,此时的request则是我们之前传进来的

        public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
                final byte[] body, final ClientHttpRequestExecution execution) {
            return instance -> {
                HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
                if (transformers != null) {
                    for (LoadBalancerRequestTransformer transformer : transformers) {
                        serviceRequest = transformer.transformRequest(serviceRequest, instance);
                    }
                }
                return execution.execute(serviceRequest, body);
            };
        }
    

    ServiceRequestWrapper会将服务名替换成具体的ip然后执行http请求

        @Override
        public URI reconstructURI(ServiceInstance instance, URI original) {
            Assert.notNull(instance, "instance can not be null");
            String serviceId = instance.getServiceId();
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
    
            URI uri;
            Server server;
            if (instance instanceof RibbonServer) {
                RibbonServer ribbonServer = (RibbonServer) instance;
                server = ribbonServer.getServer();
                uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
            } else {
                server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
                IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
                ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
                uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                        serverIntrospector, server);
            }
            return context.reconstructURIWithServer(server, uri);
        }
    

    相关文章

      网友评论

          本文标题:ribbon源码解析

          本文链接:https://www.haomeiwen.com/subject/zfuzlktx.html