美文网首页
spring cloud构建互联网分布式微服务云平台- Ribb

spring cloud构建互联网分布式微服务云平台- Ribb

作者: IT小跑兵 | 来源:发表于2019-02-22 09:27 被阅读1次

    Ribbon 是netflix 公司开源的基于客户端的负载均衡组件,是Spring Cloud大家庭中非常重要的一个模块;Ribbon应该也是整个大家庭中相对而言比较复杂的模块,直接影响到服务调度的质量和性能。

    全面掌握Ribbon可以帮助我们了解在分布式微服务集群工作模式下,服务调度应该考虑到的每个环节。

    本文将详细地剖析Ribbon的设计原理,帮助大家对Spring Cloud 有一个更好的认知。 愿意了解源码的朋友直接求求交流分享技术 一七九一七四三三八零

    一. Spring集成下的Ribbon工作结构
    先贴一张总览图,说明一下Spring如何集成Ribbon的,如下所示:


    2.jpg

    Spring Cloud集成模式下的Ribbon有以下几个特征:

    Ribbon 服务配置方式

    每一个服务配置都有一个Spring ApplicationContext上下文,用于加载各自服务的实例。

    和Feign的集成模式

    在使用Feign作为客户端时,最终请求会转发成 http://<服务名称>/的格式,通过LoadBalancerFeignClient,
    提取出服务标识<服务名称>,然后根据服务名称在上下文中查找对应服务的负载均衡器FeignLoadBalancer,负载均衡器负责根据既有的服务实例的统计信息,挑选出最合适的服务实例

    二、Spring Cloud模式下和Feign的集成实现方式

    和Feign结合的场景下,Feign的调用会被包装成调用请求LoadBalancerCommand,然后底层通过Rxjava基于事件的编码风格,发送请求;Spring Cloud框架通过 Feigin 请求的URL,提取出服务名称,然后在上下文中找到对应服务的的负载均衡器实现FeignLoadBalancer,然后通过负载均衡器中挑选一个合适的Server实例,然后将调用请求转发到该Server实例上,完成调用,在此过程中,记录对应Server实例的调用统计信息。

    /**
         * Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
         * If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
         * function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
         * exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
         * result during execution and retries will be emitted.
         */
        public Observable<T> submit(final ServerOperation<T> operation) {
            final ExecutionInfoContext context = new ExecutionInfoContext();
            
            if (listenerInvoker != null) {
                try {
                    listenerInvoker.onExecutionStart();
                } catch (AbortExecutionException e) {
                    return Observable.error(e);
                }
            }
            
            // 同一Server最大尝试次数
            final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
            //下一Server最大尝试次数
            final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    
            // Use the load balancer
            // 使用负载均衡器,挑选出合适的Server,然后执行Server请求,将请求的数据和行为整合到ServerStats中
            Observable<T> o = 
                    (server == null ? selectServer() : Observable.just(server))
                    .concatMap(new Func1<Server, Observable<T>>() {
                        @Override
                        // Called for each server being selected
                        public Observable<T> call(Server server) {
                            // 获取Server的统计值
                            context.setServer(server);
                            final ServerStats stats = loadBalancerContext.getServerStats(server);
                            
                            // Called for each attempt and retry 服务调用
                            Observable<T> o = Observable
                                    .just(server)
                                    .concatMap(new Func1<Server, Observable<T>>() {
                                        @Override
                                        public Observable<T> call(final Server server) {
                                            context.incAttemptCount();//重试计数
                                            loadBalancerContext.noteOpenConnection(stats);//链接统计
                                            
                                            if (listenerInvoker != null) {
                                                try {
                                                    listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                                } catch (AbortExecutionException e) {
                                                    return Observable.error(e);
                                                }
                                            }
                                            //执行监控器,记录执行时间
                                            final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                            //找到合适的server后,开始执行请求
                                            //底层调用有结果后,做消息处理
                                            return operation.call(server).doOnEach(new Observer<T>() {
                                                private T entity;
                                                @Override
                                                public void onCompleted() {
                                                    recordStats(tracer, stats, entity, null);
                                                    // 记录统计信息
                                                }
    
                                                @Override
                                                public void onError(Throwable e) {
                                                    recordStats(tracer, stats, null, e);//记录异常信息
                                                    logger.debug("Got error {} when executed on server {}", e, server);
                                                    if (listenerInvoker != null) {
                                                        listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                    }
                                                }
    
                                                @Override
                                                public void onNext(T entity) {
                                                    this.entity = entity;//返回结果值
                                                    if (listenerInvoker != null) {
                                                        listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                    }
                                                }                            
                                                
                                                private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                    tracer.stop();//结束计时
                                                    //标记请求结束,更新统计信息
                                                    loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                                }
                                            });
                                        }
                                    });
                            //如果失败,根据重试策略触发重试逻辑
                            // 使用observable 做重试逻辑,根据predicate 做逻辑判断,这里做
                            if (maxRetrysSame > 0) 
                                o = o.retry(retryPolicy(maxRetrysSame, true));
                            return o;
                        }
                    });
             // next请求处理,基于重试器操作   
            if (maxRetrysNext > 0 && server == null) 
                o = o.retry(retryPolicy(maxRetrysNext, false));
            
            return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
                @Override
                public Observable<T> call(Throwable e) {
                    if (context.getAttemptCount() > 0) {
                        if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                            e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                    "Number of retries on next server exceeded max " + maxRetrysNext
                                    + " retries, while making a call for: " + context.getServer(), e);
                        }
                        else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                            e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                    "Number of retries exceeded max " + maxRetrysSame
                                    + " retries, while making a call for: " + context.getServer(), e);
                        }
                    }
                    if (listenerInvoker != null) {
                        listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                    }
                    return Observable.error(e);
                }
            });
        }
    

    从一组ServerList 列表中挑选合适的Server

        /**
         * Compute the final URI from a partial URI in the request. The following steps are performed:
         * <ul>
         * <li>  如果host尚未指定,则从负载均衡器中选定 host/port
         * <li>  如果host 尚未指定并且尚未找到负载均衡器,则尝试从 虚拟地址中确定host/port
         * <li> 如果指定了HOST,并且URI的授权部分通过虚拟地址设置,并且存在负载均衡器,则通过负载就均衡器中确定host/port(指定的HOST将会被忽略)
         * <li> 如果host已指定,但是尚未指定负载均衡器和虚拟地址配置,则使用真实地址作为host
         * <li> if host is missing but none of the above applies, throws ClientException
         * </ul>
         *
         * @param original Original URI passed from caller
         */
        public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
            String host = null;
            int port = -1;
            if (original != null) {
                host = original.getHost();
            }
            if (original != null) {
                Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);        
                port = schemeAndPort.second();
            }
    
            // Various Supported Cases
            // The loadbalancer to use and the instances it has is based on how it was registered
            // In each of these cases, the client might come in using Full Url or Partial URL
            ILoadBalancer lb = getLoadBalancer();
            if (host == null) {
                // 提供部分URI,缺少HOST情况下
                // well we have to just get the right instances from lb - or we fall back
                if (lb != null){
                    Server svc = lb.chooseServer(loadBalancerKey);// 使用负载均衡器选择Server
                    if (svc == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Load balancer does not have available server for client: "
                                        + clientName);
                    }
                    //通过负载均衡器选择的结果中选择host
                    host = svc.getHost();
                    if (host == null){
                        throw new ClientException(ClientException.ErrorType.GENERAL,
                                "Invalid Server for :" + svc);
                    }
                    logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
                    return svc;
                } else {
                    // No Full URL - and we dont have a LoadBalancer registered to
                    // obtain a server
                    // if we have a vipAddress that came with the registration, we
                    // can use that else we
                    // bail out
                    // 通过虚拟地址配置解析出host配置返回
                    if (vipAddresses != null && vipAddresses.contains(",")) {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                "Method is invoked for client " + clientName + " with partial URI of ("
                                + original
                                + ") with no load balancer configured."
                                + " Also, there are multiple vipAddresses and hence no vip address can be chosen"
                                + " to complete this partial uri");
                    } else if (vipAddresses != null) {
                        try {
                            Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
                            host = hostAndPort.first();
                            port = hostAndPort.second();
                        } catch (URISyntaxException e) {
                            throw new ClientException(
                                    ClientException.ErrorType.GENERAL,
                                    "Method is invoked for client " + clientName + " with partial URI of ("
                                    + original
                                    + ") with no load balancer configured. "
                                    + " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
                        }
                    } else {
                        throw new ClientException(
                                ClientException.ErrorType.GENERAL,
                                this.clientName
                                + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
                                + " Also has no vipAddress registered");
                    }
                }
            } else {
                // Full URL Case URL中指定了全地址,可能是虚拟地址或者是hostAndPort
                // This could either be a vipAddress or a hostAndPort or a real DNS
                // if vipAddress or hostAndPort, we just have to consult the loadbalancer
                // but if it does not return a server, we should just proceed anyways
                // and assume its a DNS
                // For restClients registered using a vipAddress AND executing a request
                // by passing in the full URL (including host and port), we should only
                // consult lb IFF the URL passed is registered as vipAddress in Discovery
                boolean shouldInterpretAsVip = false;
    
                if (lb != null) {
                    shouldInterpretAsVip = isVipRecognized(original.getAuthority());
                }
                if (shouldInterpretAsVip) {
                    Server svc = lb.chooseServer(loadBalancerKey);
                    if (svc != null){
                        host = svc.getHost();
                        if (host == null){
                            throw new ClientException(ClientException.ErrorType.GENERAL,
                                    "Invalid Server for :" + svc);
                        }
                        logger.debug("using LB returned Server: {} for request: {}", svc, original);
                        return svc;
                    } else {
                        // just fall back as real DNS
                        logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
                    }
                } else {
                    // consult LB to obtain vipAddress backed instance given full URL
                    //Full URL execute request - where url!=vipAddress
                    logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
                }
            }
            // end of creating final URL
            if (host == null){
                throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
            }
            // just verify that at this point we have a full URL
    
            return new Server(host, port);
        }
    

    相关文章

      网友评论

          本文标题:spring cloud构建互联网分布式微服务云平台- Ribb

          本文链接:https://www.haomeiwen.com/subject/rlrsyqtx.html