美文网首页
Spring Cloud Ribbon 源码分析

Spring Cloud Ribbon 源码分析

作者: 想起个帅气的头像 | 来源:发表于2021-01-04 00:31 被阅读0次

    Spring Cloud Ribbon 源码分析

    前言

    原理介绍

    ribbon提供了http请求负载均衡的能力,既然要扩展调度能力,就需要在请求之前,通过某种调度策略选择合适的server来调用。
    想要实现这个效果需要以下几步:

    1. 对http的请求已经拦截,对其进行扩展
    2. 通过调度策略选择合适的server
    3. 改写http的ip和port来定向调用

    源码分析

    1. http请求拦截

    首先介绍ribbon是如何进行给请求增加拦截器,需要用到LoadBalancerAutoConfiguration

    LoadBalancerAutoConfiguration
    public class LoadBalancerAutoConfiguration {
    
        @LoadBalanced
        @Autowired(required = false)
        private List<RestTemplate> restTemplates = Collections.emptyList();
    
        @Bean
        public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
                final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
            return () -> restTemplateCustomizers.ifAvailable(customizers -> {
                for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
                    for (RestTemplateCustomizer customizer : customizers) {
                        customizer.customize(restTemplate);
                    }
                }
            });
        }
    
            @Bean
            public LoadBalancerInterceptor loadBalancerInterceptor(
                    LoadBalancerClient loadBalancerClient,
                    LoadBalancerRequestFactory requestFactory) {
                return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
            }
    
            @Bean
            @ConditionalOnMissingBean
            public RestTemplateCustomizer restTemplateCustomizer(
                    final LoadBalancerInterceptor loadBalancerInterceptor) {
                return restTemplate -> {
                    List<ClientHttpRequestInterceptor> list = new ArrayList<>(
                            restTemplate.getInterceptors());
                    list.add(loadBalancerInterceptor);
                    restTemplate.setInterceptors(list);
                };
            }
    

    LoadBalancerAutoConfiguration的restTemplates通过@LoadBalanced注解,将所有带有此注解的RestTemplate的对象都注入到list中。@LoadBalanced内部其实是继承@Qualifier实现。

    通过RestTemplateCustomizer来对每个restTemplate实现扩展,将LoadBalancerInterceptor也加入到拦截器列表中

    在整个restTemplate的调用过程中,会从getForObject -> execute -> doExecute -> request.execute 中来对拦截器进行处理。

        protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
                @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
    
            // ...
            ClientHttpResponse response = null;
            try {
                ClientHttpRequest request = createRequest(url, method);
                if (requestCallback != null) {
                    requestCallback.doWithRequest(request);
                }
                response = request.execute();
                handleResponse(url, method, response);
                return (responseExtractor != null ? responseExtractor.extractData(response) : null);
            }
            catch (IOException ex) {
                // ...
            }
            finally {
                if (response != null) {
                    response.close();
                }
            }
        }
    
        private class InterceptingRequestExecution implements ClientHttpRequestExecution {
      
            private final Iterator<ClientHttpRequestInterceptor> iterator;
    
            public InterceptingRequestExecution() {
                this.iterator = interceptors.iterator();
            }
    
            @Override
            public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
                if (this.iterator.hasNext()) {
                    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                    return nextInterceptor.intercept(request, body, this);
                }
                else {
                    HttpMethod method = request.getMethod();
                    Assert.state(method != null, "No standard HTTP method");
                    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                    request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                    if (body.length > 0) {
                        if (delegate instanceof StreamingHttpOutputMessage) {
                            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                            streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                        }
                        else {
                            StreamUtils.copy(body, delegate.getBody());
                        }
                    }
                    return delegate.execute();
                }
            }
        }
    

    一直调用到InterceptingRequestExecution的execute方法中,这里先判断当前是否有拦截器,如果存在则调用intercept,我们这里通过LoadBalancerAutoConfiguration注入了LoadBalancerInterceptor,具体看下LoadBalancerInterceptor的实现。

    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
    
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
        }
    }
    

    这里的loadBalancer是在创建Interceptor时,注入的RibbonLoadBalancerClient,自动注入的过程在RibbonAutoConfiguration中

    RibbonAutoConfiguration
    public class RibbonAutoConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public SpringClientFactory springClientFactory() {
            SpringClientFactory factory = new SpringClientFactory();
            factory.setConfigurations(this.configurations);
            return factory;
        }
    
        @Bean
        @ConditionalOnMissingBean(LoadBalancerClient.class)
        public LoadBalancerClient loadBalancerClient() {
            return new RibbonLoadBalancerClient(springClientFactory());
        }
    }
    

    至此完成了前期的请求调用拦截的部分,开始进入ribbon调度策略的选择环节。

    2. 调度策略

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
                throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer, hint);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                    isSecure(server, serviceId),
                    serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    

    首先实例化ILoadBalancer的bean,默认会注入ZoneAwareLoadBalancer的bean,ZoneAwareLoadBalancer的继承关系如下图。


    ILoadBalancer的注入实现在RibbonClientConfiguration中。

    RibbonClientConfiguration
        // client的请求的默认配置
        @Bean
        @ConditionalOnMissingBean
        public IClientConfig ribbonClientConfig() {
            DefaultClientConfigImpl config = new DefaultClientConfigImpl();
            config.loadProperties(this.name);
            config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
            config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
            config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
            return config;
        }
        // 调度规则
        @Bean
        @ConditionalOnMissingBean
        public IRule ribbonRule(IClientConfig config) {
            if (this.propertiesFactory.isSet(IRule.class, name)) {
                return this.propertiesFactory.get(IRule.class, config, name);
            }
            ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
            rule.initWithNiwsConfig(config);
            return rule;
        }
        // 维持心跳ping
        @Bean
        @ConditionalOnMissingBean
        public IPing ribbonPing(IClientConfig config) {
            if (this.propertiesFactory.isSet(IPing.class, name)) {
                return this.propertiesFactory.get(IPing.class, config, name);
            }
            return new DummyPing();
        }
        // 服务端提供的server列表
        @Bean
        @ConditionalOnMissingBean
        @SuppressWarnings("unchecked")
        public ServerList<Server> ribbonServerList(IClientConfig config) {
            if (this.propertiesFactory.isSet(ServerList.class, name)) {
                return this.propertiesFactory.get(ServerList.class, config, name);
            }
            ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
            serverList.initWithNiwsConfig(config);
            return serverList;
        }
        // server列表动态更新
        @Bean
        @ConditionalOnMissingBean
        public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
            return new PollingServerListUpdater(config);
        }
        // loadbalance的客户端对象
        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
        // server列表选择过滤器
        @Bean
        @ConditionalOnMissingBean
        @SuppressWarnings("unchecked")
        public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
            if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
                return this.propertiesFactory.get(ServerListFilter.class, config, name);
            }
            ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
            filter.initWithNiwsConfig(config);
            return filter;
        }
        //  loadbalancer的上下文配置
        @Bean
        @ConditionalOnMissingBean
        public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
                IClientConfig config, RetryHandler retryHandler) {
            return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
        }
        // 重试处理器
        @Bean
        @ConditionalOnMissingBean
        public RetryHandler retryHandler(IClientConfig config) {
            return new DefaultLoadBalancerRetryHandler(config);
        }
        // server 检查器(内省),检查是否安全端口,获取基本信息等。
        @Bean
        @ConditionalOnMissingBean
        public ServerIntrospector serverIntrospector() {
            return new DefaultServerIntrospector();
        }
    

    ZoneAwareLoadBalancer的实例化还依赖的其他的bean,如ZoneAvoidanceRuleDummyPingConfigurationBasedServerListDefaultLoadBalancerRetryHandler

    ZoneAwareLoadBalancer

    ZoneAwareLoadBalancer 在构造方法中,会完成一系列的初始化动作,如serverList的设置、开启server刷新任务,初始化loadBalanceStat,配置负载均衡策略、ping策略、filter等操作。

    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                     IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                     ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
        }
    
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                             ServerList<T> serverList, ServerListFilter<T> filter,
                                             ServerListUpdater serverListUpdater) {
            super(clientConfig, rule, ping);
            this.serverListImpl = serverList;
            this.filter = filter;
            this.serverListUpdater = serverListUpdater;
            if (filter instanceof AbstractServerListFilter) {
                ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
            }
            restOfInit(clientConfig);
        }
    
    restOfInit()
    void restOfInit(IClientConfig clientConfig) {
            boolean primeConnection = this.isEnablePrimingConnections();
            // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
            this.setEnablePrimingConnections(false);
            enableAndInitLearnNewServersFeature();
    
            updateListOfServers();
            if (primeConnection && this.getPrimeConnections() != null) {
                this.getPrimeConnections()
                        .primeConnections(getReachableServers());
            }
            this.setEnablePrimingConnections(primeConnection);
            LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
        }
    

    在restOfInit方法中,有两个关键方法:enableAndInitLearnNewServersFeatureupdateListOfServers

    enableAndInitLearnNewServersFeature()
    public void enableAndInitLearnNewServersFeature() {
            LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
            serverListUpdater.start(updateAction);
        }
    
     @Override
        public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
                final Runnable wrapperRunnable = new Runnable() {
                    @Override
                    public void run() {
                        if (!isActive.get()) {
                            if (scheduledFuture != null) {
                                scheduledFuture.cancel(true);
                            }
                            return;
                        }
                        try {
                            updateAction.doUpdate();
                            lastUpdated = System.currentTimeMillis();
                        } catch (Exception e) {
                            logger.warn("Failed one update cycle", e);
                        }
                    }
                };
    
                scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                        wrapperRunnable,
                        initialDelayMs,
                        refreshIntervalMs,
                        TimeUnit.MILLISECONDS
                );
            } else {
                logger.info("Already active, no-op");
            }
        }
    

    可以看到这个方法开启了一个schedule,这个schedule的用途是每隔一段时间刷新serverList。这里传入了一个UpdateAction,这个UpdateAction是

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
            @Override
            public void doUpdate() {
                updateListOfServers();
            }
        };
    

    通过调用updateListOfServers来刷新serverList,而刷新的时间间隔refreshIntervalMs默认是30s。

    updateListOfServers
    @VisibleForTesting
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    

    这里的servers其实就是我们配置的listOfServers列表,在获取到最新的servers之后,会调用updateAllServerList来更新缓存。

     protected void updateAllServerList(List<T> ls) {
            // other threads might be doing this - in which case, we pass
            if (serverListUpdateInProgress.compareAndSet(false, true)) {
                try {
                    for (T s : ls) {
                        s.setAlive(true); // set so that clients can start using these
                                          // servers right away instead
                                          // of having to wait out the ping cycle.
                    }
                    setServersList(ls);
                    super.forceQuickPing();
                } finally {
                    serverListUpdateInProgress.set(false);
                }
            }
        }
    
    @Override
        public void setServersList(List lsrv) {
            super.setServersList(lsrv);
            List<T> serverList = (List<T>) lsrv;
            Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
            for (Server server : serverList) {
                // make sure ServerStats is created to avoid creating them on hot
                // path
                getLoadBalancerStats().getSingleServerStat(server);
                String zone = server.getZone();
                if (zone != null) {
                    zone = zone.toLowerCase();
                    List<Server> servers = serversInZones.get(zone);
                    if (servers == null) {
                        servers = new ArrayList<Server>();
                        serversInZones.put(zone, servers);
                    }
                    servers.add(server);
                }
            }
            setServerListForZones(serversInZones);
        }
    

    首先在super.setSersList中,将全局的server列表,也就是allServerList做更新,
    在for循环中,调用getSingleServerStat来刷新一下内部cache,最后设置zone。

    这里需要说明一下LoadBalancerStats

    LoadBalancerStats

    public class LoadBalancerStats implements IClientConfigAware {
        
        private static final String PREFIX = "LBStats_";
        
        String name;
        
        // Map<Server,ServerStats> serverStatsMap = new ConcurrentHashMap<Server,ServerStats>();
        volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
        volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
        
        private volatile CachedDynamicIntProperty connectionFailureThreshold;
            
        private volatile CachedDynamicIntProperty circuitTrippedTimeoutFactor;
    
        private volatile CachedDynamicIntProperty maxCircuitTrippedTimeout;
    
        private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES = 
            DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30);
        
        private final LoadingCache<Server, ServerStats> serverStatsCache = 
            CacheBuilder.newBuilder()
                .expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
                .removalListener(new RemovalListener<Server, ServerStats>() {
                    @Override
                    public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
                        notification.getValue().close();
                    }
                })
                .build(
                    new CacheLoader<Server, ServerStats>() {
                        public ServerStats load(Server server) {
                            return createServerStats(server);
                        }
                    });
    
    

    ribbon通过这个对象来记录每个server的运行特征和统计数据,如各种连接时间,响应时间,熔断配置,请求次数,serverStat,zoneStat等。

    LoadBalancerStats的这些参数,会作为负载均衡选择策略的重要参考依据。

    至此,ZoneAwareLoadBalancer的bean实例化完成。

    回到调度选择的开始处,在完成loadBalance的instance之后,开始选择Server。

    getServer
    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
            if (loadBalancer == null) {
                return null;
            }
            // Use 'default' on a null hint, or just pass it on?
            return loadBalancer.chooseServer(hint != null ? hint : "default");
        }
    
    chooseServer
     @Override
        public Server chooseServer(Object key) {
            if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
                logger.debug("Zone aware logic disabled or there is only one zone");
                return super.chooseServer(key);
            }
            Server server = null;
            try {
                LoadBalancerStats lbStats = getLoadBalancerStats();
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (triggeringLoad == null) {
                    triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
                }
    
                if (triggeringBlackoutPercentage == null) {
                    triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
                }
                Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
                logger.debug("Available zones: {}", availableZones);
                if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                    logger.debug("Zone chosen: {}", zone);
                    if (zone != null) {
                        BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                        server = zoneLoadBalancer.chooseServer(key);
                    }
                }
            } catch (Exception e) {
                logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
            }
            if (server != null) {
                return server;
            } else {
                logger.debug("Zone avoidance logic is not invoked.");
                return super.chooseServer(key);
            }
        }
    

    选择分两部分

    1. 如果没有配置zone,即只有一个zone,直接调用super.chooseServer。
    2. 如果有一个以上的zone, 则根据zone的算法选择一个server。

    先看一下不使用zone算法的实现。

    public Server chooseServer(Object key) {
            if (counter == null) {
                counter = createCounter();
            }
            counter.increment();
            if (rule == null) {
                return null;
            } else {
                try {
                    return rule.choose(key);
                } catch (Exception e) {
                    logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                    return null;
                }
            }
        }
    

    增加调用次数,并通过role的实现选择一个server。
    这里的zone是ZoneAvoidanceRule

    @Override
        public Server choose(Object key) {
            ILoadBalancer lb = getLoadBalancer();
            Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }       
        }
    
    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
            List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
            if (eligible.size() == 0) {
                return Optional.absent();
            }
            return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
        }
    

    内部通过轮询的机制,来选择server,除了ZoneAvoidanceRule的实现之外,还有提供了很多的其他rule策略,如下图:


    比如WeighedResponseTimeRule就是通过响应时间来动态选择server,具体算法的数据就是依赖LoadBalanceStat的数据统计。

    3. uri改写和调用

    继续回到调度的最开始,选择好server之后,就这些对象封装成RibbonServer,就继续执行execute方法。

    @Override
        public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if (serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer) serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            catch (IOException ex) {
                statsRecorder.recordStats(ex);
                throw ex;
            }
            catch (Exception ex) {
                statsRecorder.recordStats(ex);
                ReflectionUtils.rethrowRuntimeException(ex);
            }
            return null;
        }
    

    这个方法主要就是request.apply的调用。
    request是在前面的调用中,this.requestFactory.createRequest(request, body, execution) 来构建。

    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) {
            return instance -> {
                HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
                if (this.transformers != null) {
                    for (LoadBalancerRequestTransformer transformer : this.transformers) {
                        serviceRequest = transformer.transformRequest(serviceRequest, instance);
                    }
                }
                return execution.execute(serviceRequest, body);
            };
        }
    

    可以看到这里通过定义了一个匿名内部类,定义了apply这个方法的实现。
    封装了一个ServiceRequestWrapper,并提供了一个扩展口,可以自定义LoadBalancerRequestTransformer来改变原有的一些实现,如可以改变之前的server选择。
    转化完毕后,继续执行execute。

    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    
            private final Iterator<ClientHttpRequestInterceptor> iterator;
    
            public InterceptingRequestExecution() {
                this.iterator = interceptors.iterator();
            }
    
            @Override
            public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
                if (this.iterator.hasNext()) {
                    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                    return nextInterceptor.intercept(request, body, this);
                }
                else {
                    HttpMethod method = request.getMethod();
                    Assert.state(method != null, "No standard HTTP method");
                    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                    request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                    if (body.length > 0) {
                        if (delegate instanceof StreamingHttpOutputMessage) {
                            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                            streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                        }
                        else {
                            StreamUtils.copy(body, delegate.getBody());
                        }
                    }
                    return delegate.execute();
                }
            }
        }
    

    这里重新创建了ClientHttpRequest,在getURI中,重构了uri的地址,将之前uri中的服务名,改成了选择的ip和端口,重新设置headers后,发起真正的request调用。

    至此,一个完成的请求的ribbon负载过程执行完成。

    相关文章

      网友评论

          本文标题:Spring Cloud Ribbon 源码分析

          本文链接:https://www.haomeiwen.com/subject/lkexoktx.html