美文网首页
SpringCloud-源码分析 Ribbon

SpringCloud-源码分析 Ribbon

作者: 叩丁狼教育 | 来源:发表于2018-12-06 09:55 被阅读77次

    本文作者:陈刚,叩丁狼高级讲师。原创文章,转载请注明出处。

    本文章会通过断点跟踪的方式来解读 Ribbon 源码 ,希望同学们把文章看完之后不仅能够了解 Ribbon的实现原理,还能掌握源码解读的方式和技巧(重要)。

    回顾

    回顾一下我们的 Ribbon部分内容我们当时使用TestTemplate + LoadBalanced 做了这样的一个案例


    叩丁狼教育.png

    当时我们在配置类中做了如下Bean的定义去开启了RestTemplate的负载均衡功能

    //通过@LoadBalanced注解表明这个restRemplate开启负载均衡的功能。
        //RestTemplate是spring内置的http请求封装
        @Bean
        @LoadBalanced
        RestTemplate restTemplate() {
            return new RestTemplate();
        }
    

    然后在Consumer中去请求Producer服务(当然会同时开启多个Producer服务)

    //访问 PRODUCER 服务
    String result = 
    restTemplate.getForObject("http://PRODUCER/provide?name="+name,String.class);
    

    而我们需要达到的效果是该请求多次调用会从不同的Producer服务获取到结果(根据负载均衡规则),然而我们发的请求始终只会有一个呀,那么它要如何才能实现服务之间的切换调用呢?那么猜想一下,Ribbon是不是需要需要先拦截到我们的请求,然后根据我们定义的负载均衡算法,然后从服务清单中去选择合适的服务实例,然后完成调用呢???(思考一下)

    那么接下来我们就来对这样的一个请求进行源码追踪分析。

    @LoadBalanced

    我们先来研究一下@LoadBalanced是一个什么东西,查看他的源码如下

    /**
     * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
     * @author Spencer Gibb
     */
    @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Qualifier
    public @interface LoadBalanced {
    }
    
    

    注释 “Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient” 告诉我们:@LoadBalanced标签是用来给RestTemplate标记,以使用LoadBalancerClient(负载均衡的客户端)来配置它。
    我们继续追踪 LoadBalancerClient的源码

    /**
     * Represents a client side load balancer
     * @author Spencer Gibb
     */
    public interface LoadBalancerClient extends ServiceInstanceChooser {
    
        /**
         * execute request using a ServiceInstance from the LoadBalancer for the specified
         * service
         * @param serviceId the service id to look up the LoadBalancer
         * @param request allows implementations to execute pre and post actions such as
         * incrementing metrics
         * @return the result of the LoadBalancerRequest callback on the selected
         * ServiceInstance
         */
        <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
    
        /**
         * execute request using a ServiceInstance from the LoadBalancer for the specified
         * service
         * @param serviceId the service id to look up the LoadBalancer
         * @param serviceInstance the service to execute the request to
         * @param request allows implementations to execute pre and post actions such as
         * incrementing metrics
         * @return the result of the LoadBalancerRequest callback on the selected
         * ServiceInstance
         */
        <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
    
        /**
         * Create a proper URI with a real host and port for systems to utilize.
         * Some systems use a URI with the logical serivce name as the host,
         * such as http://myservice/path/to/service.  This will replace the
         * service name with the host:port from the ServiceInstance.
         * @param instance
         * @param original a URI with the host as a logical service name
         * @return a reconstructed URI
         */
        URI reconstructURI(ServiceInstance instance, URI original);
    }
    
    

    注释“Represents a client side load balancer”表示该接口它是一个客户端负载均衡器 ,提供了几个方法,翻译方法上的注释得知他们的作用大致如下
    execute : 使用LoadBalancer中的ServiceInstance为指定的服务执行请求,说白了就是通过它来实现服务的请求调用。
    reconstructURI:使用真实主机和端口创建适当的URI以供系统使用,获取要调用的服务的主机和端口

    并且该接口它继承ServiceInstanceChooser接口

    /**
     * Implemented by classes which use a load balancer to choose a server to
     * send a request to.
     *
     * @author Ryan Baxter
     */
    public interface ServiceInstanceChooser {
    
        /**
         * Choose a ServiceInstance from the LoadBalancer for the specified service
         * @param serviceId the service id to look up the LoadBalancer
         * @return a ServiceInstance that matches the serviceId
         */
        ServiceInstance choose(String serviceId);
    }
    

    翻译接口上的注释“ Implemented by classes which use a load balancer to choose a server to send a request to.”大致意思为: 使用负载均衡器选择一个服务,然后去发起请求,而 choose方法的大致作用为:从LoadBalancer负载均衡器中为指定的服务(serviceId)选择一个服务实例(ServiceInstance) ,其实到这里我们大致已经明白了LoadBalancerClient的目的,就是通过choose方法选择要调用的服务实例,通过reconstructURI获取服务和主机和端口,然后通过execute执行服务的调用,而 RibbonLoadBalancerClient是对 LoadBalancerClient 的实现 ,他们的层级关系如下(idea中按ctrl+alt+u查看):


    image.png

    那么LoadBalancer到底是怎么让RestTtemplate 实现负载均衡的呢?要揭露这个答案我们必须得跟踪RestTemplate的的请求,断点一下RestTemplate服务调用的代码,然后去浏览器请求该方法触发断点,看看底层是如何实现调用的

      @RequestMapping("/consumer")
        public String consumer(@RequestParam("name") String name){
            //访问 PRODUCER 服务
            String result = restTemplate.getForObject("http://PRODUCER/provide?name="+name,String.class);
            return result;
        }
    
    

    跟踪 getForObject 方法进入

    public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
          RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);
          HttpMessageConverterExtractor<T> responseExtractor =
                    new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
          return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);
        }
    

    这里继续调用了RestTemplate.execute 方法,并且把调用的服务地址传入进去,然后使用HttpMethod.GET方式进行调用,继续跟踪下去

    protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
                @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
    
            Assert.notNull(url, "'url' must not be null");
            Assert.notNull(method, "'method' must not be null");
            ClientHttpResponse response = null;
            try {
                ClientHttpRequest request = createRequest(url, method);
                if (requestCallback != null) {
                    requestCallback.doWithRequest(request);
                }
                response = request.execute();
                handleResponse(url, method, response);
                if (responseExtractor != null) {
                    return responseExtractor.extractData(response);
                }
                else {
                    return null;
                }
            }
            ...省略代码...
    

    这里把服务地址包装成 URI对象 ,然后调用 ClientHttpRequest request = createRequest(url, method); 去创建http客户端请求对象,然后调用 response = request.execute();执行请求,到这里我们依然没有找到他是如何实现负载均衡的。我们断点跟踪一下 createRequest方法最终会调用HttpAccessor#createRequest方法,HttpAccessor是个什么东西呢,我们在RestTemplate类中按下 ctrl+alt+u查看他们的层级关系如下


    叩丁狼教育.png

    请注意看,RestTemplate继承了InterceptingHttpAccessor(看名字像http请求的访问拦截的意思,后续再说) 而 InterceptingHttpAccessor又继承了HttpAccessor,而createRequest方法如下

        protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
            ClientHttpRequest request = getRequestFactory().createRequest(url, method);
            if (logger.isDebugEnabled()) {
                logger.debug("Created " + method.name() + " request for \"" + url + "\"");
            }
            return request;
        }
    

    这里调用了 getRequestFactory()方法获取到一个请求工厂,然后去创建 ClientHttpRequest ,而getRequestFactory()方法是在InterceptingHttpAccessor中进行复写实现的,跟踪interceptingHttpAccessor#getRequestFactory方法如下:

    public ClientHttpRequestFactory getRequestFactory() {
            List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
            if (!CollectionUtils.isEmpty(interceptors)) {
                ClientHttpRequestFactory factory = this.interceptingRequestFactory;
                if (factory == null) {
                    factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
                    this.interceptingRequestFactory = factory;
                }
                return factory;
            }
            else {
                return super.getRequestFactory();
            }
        }
    
    /**
         * Return the request interceptors that this accessor uses.
         * <p>The returned {@link List} is active and may get appended to.
         */
        public List<ClientHttpRequestInterceptor> getInterceptors() {
            return this.interceptors;
        }
    

    可以看到这里调用getInterceptors()方法在获取 ClientHttpRequestInterceptor http客户端请求拦截器 ,然后使用拦截器new 了一个 InterceptingClientHttpRequestFactory 工厂出来,他这里用到了请求拦截器,是想干嘛呢???(思考一下)
    我们继续往下执行结束完getRequestFactory() 方法我们跟踪一下 getRequestFactory().createRequest(url, method); 的createRequest 方法

        @Override
        protected ClientHttpRequest createRequest(
    URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
            return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
        }
    

    没有任何悬念,断点跟踪下去通过调用了InterceptingClientHttpRequestFactory#createRequest方法来创建 l了一个InterceptingClientHttpRequest对象,而InterceptingClientHttpRequest肯定是ClientHttpRequest的实现类

      /**
     * Wrapper for a {@link ClientHttpRequest} that has support for {@link ClientHttpRequestInterceptor}s.
     *
     * @author Arjen Poutsma
     * @since 3.1
     */
    class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
    
        private final ClientHttpRequestFactory requestFactory;
    
        private final List<ClientHttpRequestInterceptor> interceptors;
    
        private HttpMethod method;
    
        private URI uri;
    
    
        protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory,
                List<ClientHttpRequestInterceptor> interceptors, URI uri, HttpMethod method) {
    
            this.requestFactory = requestFactory;
            this.interceptors = interceptors;
            this.method = method;
            this.uri = uri;
        }
    
    

    注释:“Wrapper for a {@link ClientHttpRequest} that has support for {@link ClientHttpRequestInterceptor}s.” 告诉我们
    InterceptingClientHttpRequest他是对ClientHttpRequest做了包装,并且ClientHttpRequestInterceptor提供支持,对HttpMethod (请求方式)和 URI(服务地址)亦有描述 ,我们看一下他的继承体系


    ok,看到这里我们知道 RestTemplate底层是通过 ClientHttpRequestFactory工厂创建了 InterceptingClientHttpRequest 拥有可拦截功能的http客户端请求对象,那么他是怎么调用的呢??又是如何实现拦截的呢???
    我们回到 org.springframework.web.client.RestTemplate#doExecute 方法中的 response = request.execute(); 代码继续跟踪,然后你就会发现,当执行request.execute方法的时候会跳转到 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor#intercept 拦截器方法中,看名字就能猜到这个拦截器跟负载均衡(LoadBalancer)有关,他是ClientHttpRequestInterceptor的实现,

        /**
     * @author Spencer Gibb
     * @author Dave Syer
     * @author Ryan Baxter
     * @author William Tran
     */
    public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    
        private LoadBalancerClient loadBalancer;
        private LoadBalancerRequestFactory requestFactory;
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
            this.loadBalancer = loadBalancer;
            this.requestFactory = requestFactory;
        }
    
        public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
            // for backwards compatibility
            this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
        }
    
        @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    }
    

    继续跟踪一下 ClientHttpRequestInterceptor 接口

    /**
     * Intercepts client-side HTTP requests. Implementations of this interface can be
     * {@linkplain org.springframework.web.client.RestTemplate#setInterceptors registered}
     * with the {@link org.springframework.web.client.RestTemplate RestTemplate},
     * as to modify the outgoing {@link ClientHttpRequest} and/or the incoming
     * {@link ClientHttpResponse}.
     *
     * <p>The main entry point for interceptors is
     * {@link #intercept(HttpRequest, byte[], ClientHttpRequestExecution)}.
     *
     * @author Arjen Poutsma
     * @since 3.1
     */
    @FunctionalInterface
    public interface ClientHttpRequestInterceptor {
        /**
         * Intercept the given request, and return a response. The given
         * {@link ClientHttpRequestExecution} allows the interceptor to pass on the
         * request and response to the next entity in the chain.
         * <p>A typical implementation of this method would follow the following pattern:
         * <ol>
         * <li>Examine the {@linkplain HttpRequest request} and body</li>
         * <li>Optionally {@linkplain org.springframework.http.client.support.HttpRequestWrapper
         * wrap} the request to filter HTTP attributes.</li>
         * <li>Optionally modify the body of the request.</li>
         * <li><strong>Either</strong>
         * <ul>
         * <li>execute the request using
         * {@link ClientHttpRequestExecution#execute(org.springframework.http.HttpRequest, byte[])},</li>
         * <strong>or</strong>
         * <li>do not execute the request to block the execution altogether.</li>
         * </ul>
         * <li>Optionally wrap the response to filter HTTP attributes.</li>
         * </ol>
         * @param request the request, containing method, URI, and headers
         * @param body the body of the request
         * @param execution the request execution
         * @return the response
         * @throws IOException in case of I/O errors
         */
        ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
                throws IOException;
    

    翻译注释“Intercepts client-side HTTP requests. Implementations of this interface can be” 我们知道 ClientHttpRequestInterceptor就是对http请求的拦截到这里,我们的请求已经被拦截到,貌似离真相越来越近了,我们仔细分析一下 LoadBalancerInterceptor 的 intercept方法

    @Override
        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                final ClientHttpRequestExecution execution) throws IOException {
            final URI originalUri = request.getURI();
            String serviceName = originalUri.getHost();
            Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
            return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
        }
    

    这里通过 request(之前创建的InterceptingClientHttpRequest) 对象获取到URI调用的服务地址 ,然后获取到 调用的服务名字 serviceName ,然后调用org.springframework.cloud.client.loadbalancer.LoadBalancerClient.execute去执行 ,LoadBalancerClient是什么,不就是最开始我们分析的负载均衡器客户端嘛???他使用的是RibbonLoadBalancerClient实现类
    继续跟踪下去 RibbonLoadBalancerClient.execute

        @Override
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                    serviceId), serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    

    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);通过服务id获取负载均衡器 ,跟踪一下 ILoadBalancer源码

    /**
     * Interface that defines the operations for a software loadbalancer. A typical
     * loadbalancer minimally need a set of servers to loadbalance for, a method to
     * mark a particular server to be out of rotation and a call that will choose a
     * server from the existing list of server.
     * 
     * @author stonse
     * 
     */
    public interface ILoadBalancer {
    
        /**
         * Initial list of servers.
         * This API also serves to add additional ones at a later time
         * The same logical server (host:port) could essentially be added multiple times
         * (helpful in cases where you want to give more "weightage" perhaps ..)
         * 
         * @param newServers new servers to add
         */
        public void addServers(List<Server> newServers);
        
        /**
         * Choose a server from load balancer.
         * 
         * @param key An object that the load balancer may use to determine which server to return. null if 
         *         the load balancer does not use this parameter.
         * @return server chosen
         */
        public Server chooseServer(Object key);
        
        /**
         * To be called by the clients of the load balancer to notify that a Server is down
         * else, the LB will think its still Alive until the next Ping cycle - potentially
         * (assuming that the LB Impl does a ping)
         * 
         * @param server Server to mark as down
         */
        public void markServerDown(Server server);
        
        /**
         * @deprecated 2016-01-20 This method is deprecated in favor of the
         * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
         * and {@link #getAllServers} API (equivalent to availableOnly=false).
         *
         * Get the current list of servers.
         *
         * @param availableOnly if true, only live and available servers should be returned
         */
        @Deprecated
        public List<Server> getServerList(boolean availableOnly);
    
        /**
         * @return Only the servers that are up and reachable.
         */
        public List<Server> getReachableServers();
    
        /**
         * @return All known servers, both reachable and unreachable.
         */
        public List<Server> getAllServers();
    }
    
    

    翻译注释“Interface that defines the operations for a software loadbalancer.”它定义软负载均衡器操作的接口规范
    addServers:初始化服务列表
    chooseServer:选择一个服务
    markServerDown:标记服务下线,从服务列表中移除
    getReachableServers:获取可访问的服务
    getAllServers:获取所有的服务

    而默认情况下这里默认使用的是 ZoneAwareLoadBalancer,断点跟踪或者查看RibbonClientConfiguration配置类的ribbonLoadBalancer方法得知

        @Bean
        @ConditionalOnMissingBean
        public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
            if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
                return this.propertiesFactory.get(ILoadBalancer.class, config, name);
            }
            return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                    serverListFilter, serverListUpdater);
        }
    

    那么我们继续跟踪 RibbonLoadBalancerClient.execute中的 getServer(loadBalancer);代码这里是在根据负载均衡器选择将要执行调用的服务,跟踪下去
    com.netflix.loadbalancer.ZoneAwareLoadBalancer#chooseServer

       @Override
        public Server chooseServer(Object key) {
            if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
                logger.debug("Zone aware logic disabled or there is only one zone");
                return super.chooseServer(key);
            }
            Server server = null;
            try {
                LoadBalancerStats lbStats = getLoadBalancerStats();
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (triggeringLoad == null) {
                    triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscov
    

    这里的key接受了一个默认值"default",因为我们的可用的区域只有一个,所以这里调用了super.chooseServer即com.netflix.loadbalancer.BaseLoadBalancer#chooseServer

     public Server chooseServer(Object key) {
            if (counter == null) {
                counter = createCounter();
            }
            counter.increment();
            if (rule == null) {
                return null;
            } else {
                try {
                    return rule.choose(key);
                } catch (Exception e) {
                    logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                    return null;
                }
            }
        }
    

    继续跟踪 rule.choose(key); 这里调用了 com.netflix.loadbalancer.PredicateBasedRule#choose 的服务选择方法,

    /**
     * A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.
     * After filtering, a server is returned from filtered list in a round robin fashion.
     * 
     * 
     * @author awang
     *
     */
    public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
       
        /**
         * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
         * 
         */
        public abstract AbstractServerPredicate getPredicate();
            
        /**
         * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
         * The performance for this method is O(n) where n is number of servers to be filtered.
         */
        @Override
        public Server choose(Object key) {
            ILoadBalancer lb = getLoadBalancer();
            Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
            if (server.isPresent()) {
                return server.get();
            } else {
                return null;
            }       
        }
    }
    
    

    PredicateBasedRule本身是一个抽象策略,继承自ClientConfigEnabledRoundRobinRule ,而ClientConfigEnabledRoundRobinRule是一个实现了轮询策略的客户端配置

    
    /**
    RoundRobinRule :轮询
     * This class essentially contains the RoundRobinRule class defined in the
     * loadbalancer package
     * 
     * @author stonse
     * 
     */
    public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
        //轮询策略
        RoundRobinRule roundRobinRule = new RoundRobinRule();
    

    继续跟踪chooseRoundRobinAfterFiltering方法

     /**                                                                                                                                         
      * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.                      
      */                                                                                                                                         
     public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {                                      
         List<Server> eligible = getEligibleServers(servers, loadBalancerKey);                                                                   
         if (eligible.size() == 0) {                                                                                                             
             return Optional.absent();                                                                                                           
         }                                                                                                                                       
         return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));                                                               
     }                                                                                                                                           
                                                                                                                                                 
    

    翻译方法注释“ Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.”告诉我们这里是把过滤后的服务进行轮询选择 ,再看代码 List<Server> eligible获取到合格的服务器 ,incrementAndGetModulo就是在以轮询的方式获取到服务的索引。
    继续往下回到 RibbonLoadBalancerClient#execute方法

        @Override
        public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
            ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
            Server server = getServer(loadBalancer);
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
            RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
                    serviceId), serverIntrospector(serviceId).getMetadata(server));
    
            return execute(serviceId, ribbonServer, request);
        }
    

    这里拿到要调用的服务的实例的后,把服务名,服务实例等信息包装到 RibbonServer对象中

    public static class RibbonServer implements ServiceInstance {
            private final String serviceId;
            private final Server server;
            private final boolean secure;
            private Map<String, String> metadata;
    
            public RibbonServer(String serviceId, Server server) {
                this(serviceId, server, false, Collections.<String, String> emptyMap());
            }
    
            public RibbonServer(String serviceId, Server server, boolean secure,
                    Map<String, String> metadata) {
                this.serviceId = serviceId;
                this.server = server;
                this.secure = secure;
                this.metadata = metadata;
            }
    .....
    
    public interface ServiceInstance {
    
        /**
         * @return the service id as registered.
         */
        String getServiceId();
    
        /**
         * @return the hostname of the registered ServiceInstance
         */
        String getHost();
    
        /**
         * @return the port of the registered ServiceInstance
         */
        int getPort();
    
        /**
         * @return if the port of the registered ServiceInstance is https or not
         */
        boolean isSecure();
    
        /**
         * @return the service uri address
         */
        URI getUri();
    
        /**
         * @return the key value pair metadata associated with the service instance
         */
        Map<String, String> getMetadata();
    
        /**
         * @return the scheme of the instance
         */
        default String getScheme() {
            return null;
        }
    }
    
    

    RibbonServer 实现了 ServiceInstance接口, ServiceInstance本身就是对服务实例的规范,有获取服务id,主机,端口等方法。

    然后继续调用 excute方法执行请求

    @Override
        public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
            Server server = null;
            if(serviceInstance instanceof RibbonServer) {
                server = ((RibbonServer)serviceInstance).getServer();
            }
            if (server == null) {
                throw new IllegalStateException("No instances available for " + serviceId);
            }
    
            RibbonLoadBalancerContext context = this.clientFactory
                    .getLoadBalancerContext(serviceId);
            RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
    
            try {
                T returnVal = request.apply(serviceInstance);
                statsRecorder.recordStats(returnVal);
                return returnVal;
            }
            // catch IOException and rethrow so RestTemplate behaves correctly
            ......
    
    

    看代码 T returnVal = request.apply(serviceInstance);
    这里最终会调用InterceptingClientHttpRequest.InterceptingRequestExecution的execute方法,传入服务实例,执行请求,拿到返回结果。

    private class InterceptingRequestExecution implements ClientHttpRequestExecution {
    
            private final Iterator<ClientHttpRequestInterceptor> iterator;
    
            public InterceptingRequestExecution() {
                this.iterator = interceptors.iterator();
            }
    
            @Override
            public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
                if (this.iterator.hasNext()) {
                    ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
                    return nextInterceptor.intercept(request, body, this);
                }
                else {
                    HttpMethod method = request.getMethod();
                    Assert.state(method != null, "No standard HTTP method");
                    ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
                    request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
                    if (body.length > 0) {
                        if (delegate instanceof StreamingHttpOutputMessage) {
                            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                            streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
                        }
                        else {
                            StreamUtils.copy(body, delegate.getBody());
                        }
                    }
                    return delegate.execute();
                }
            }
        }
    

    跟踪 requestFactory.createRequest方法

    @Override
        public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
            HttpURLConnection connection = openConnection(uri.toURL(), this.proxy);
            prepareConnection(connection, httpMethod.name());
    
            if (this.bufferRequestBody) {
                return new SimpleBufferingClientHttpRequest(connection, this.outputStreaming);
            }
            else {
                return new SimpleStreamingClientHttpRequest(connection, this.chunkSize, this.outputStreaming);
            }
        }
    .....
    
        protected HttpURLConnection openConnection(URL url, @Nullable Proxy proxy) throws IOException {
            URLConnection urlConnection = (proxy != null ? url.openConnection(proxy) : url.openConnection());
            if (!HttpURLConnection.class.isInstance(urlConnection)) {
                throw new IllegalStateException("HttpURLConnection required for [" + url + "] but got: " + urlConnection);
            }
            return (HttpURLConnection) urlConnection;
        }
    

    到这里我们看到他是通过 URLConnection 来调用远程服务的。。。
    好吧总结一下,这个远程服务调用的背后到底做了哪些事情呢??

    1.@LoadBalanced开启了RibbonLoadBalancerClient负载均衡支持

    2.RestTemplate对服务的地址(Uri),主机(host),端口(port)等做了一些描述,然后创建了 InterceptingClientHttpRequest http请求的客户端对象,用来执行请求用,

    3.当调用RestTemplate发起请求时会被 LoadBalancerInterceptor请求拦截器给拦截到

    4.拦截器中使用了 RibbonLoadBalancerClient执行请求,然后根据服务id获取了负载均衡器,默认 ZoneAwareLoadBalancer

    5.然后负载均衡器进行服务的选择,默认使用了轮询策略

    6.拿到服务实例后调用 InterceptingClientHttpRequest 完成服务调用请求,获取返回结果。

    想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

    相关文章

      网友评论

          本文标题:SpringCloud-源码分析 Ribbon

          本文链接:https://www.haomeiwen.com/subject/tdsncqtx.html