美文网首页
SpringCloud升级之路2020.0.x版-23.订制Sp

SpringCloud升级之路2020.0.x版-23.订制Sp

作者: 干货满满张哈希 | 来源:发表于2021-08-28 08:46 被阅读0次
    image

    本系列代码地址:https://github.com/HashZhang/spring-cloud-scaffold/tree/master/spring-cloud-iiford

    我们使用 Spring Cloud 官方推荐的 Spring Cloud LoadBalancer 作为我们的客户端负载均衡器。上一节我们了解了 Spring Cloud LoadBalancer 的结构,接下来我们来说一下我们在使用 Spring Cloud LoadBalancer 要实现的功能:

    1. 我们要实现不同集群之间不互相调用,通过实例的metamap中的zone配置,来区分不同集群的实例。只有实例的metamap中的zone配置一样的实例才能互相调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现
    2. 负载均衡的轮询算法,需要请求与请求之间隔离,不能共用同一个 position 导致某个请求失败之后的重试还是原来失败的实例。上一节看到的默认的 RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次请求原子加 1。在这种情况下会有问题:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,RoundRobinLoadBalancer 返回实例 1,这时有请求 B 到达,RoundRobinLoadBalancer 返回实例 2。然后如果请求 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是我们期望看到的。

    针对这两个功能,我们分别编写自己的实现。

    image

    Spring Cloud LoadBalancer 中的 zone 配置

    Spring Cloud LoadBalancer 定义了 LoadBalancerZoneConfig

    public class LoadBalancerZoneConfig {
        //标识当前负载均衡器处于哪一个 zone
        private String zone;
        public LoadBalancerZoneConfig(String zone) {
            this.zone = zone;
        }
        public String getZone() {
            return zone;
        }
        public void setZone(String zone) {
            this.zone = zone;
        }
    }
    

    如果没有引入 Eureka 相关依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置:
    LoadBalancerAutoConfiguration

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerZoneConfig zoneConfig(Environment environment) {
        return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
    }
    

    如果引入了 Eureka 相关依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会覆盖 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig

    EurekaLoadBalancerClientConfiguration

    @PostConstruct
    public void postprocess() {
        if (!StringUtils.isEmpty(zoneConfig.getZone())) {
            return;
        }
        String zone = getZoneFromEureka();
        if (!StringUtils.isEmpty(zone)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
            }
            //设置 `LoadBalancerZoneConfig`
            zoneConfig.setZone(zone);
        }
    }
    
    private String getZoneFromEureka() {
        String zone;
        //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true
        boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
        //如果配置了,则尝试从 Eureka 配置的 host 名称中提取
        //实际就是以 . 分割 host,然后第二个就是 zone
        //例如 www.zone1.com 就是 zone1
        if (approximateZoneFromHostname && eurekaConfig != null) {
            return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
        }
        else {
            //否则,从 metadata map 中取 zone 这个 key
            zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
            //如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为当前 zone
            if (StringUtils.isEmpty(zone) && clientConfig != null) {
                String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
                // Pick the first one from the regions we want to connect to
                zone = zones != null && zones.length > 0 ? zones[0] : null;
            }
            return zone;
        }
    }
    

    实现 SameZoneOnlyServiceInstanceListSupplier

    为了实现通过 zone 来过滤同一 zone 下的实例,并且绝对不会返回非同一 zone 下的实例,我们来编写代码:

    SameZoneOnlyServiceInstanceListSupplier

    /**
     * 只返回与当前实例同一个 Zone 的服务实例,不同 zone 之间的服务不互相调用
     */
    public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
        /**
         * 实例元数据 map 中表示 zone 配置的 key
         */
        private final String ZONE = "zone";
        /**
         * 当前 spring cloud loadbalancer 的 zone 配置
         */
        private final LoadBalancerZoneConfig zoneConfig;
        private String zone;
    
        public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {
            super(delegate);
            this.zoneConfig = zoneConfig;
        }
    
        @Override
        public Flux<List<ServiceInstance>> get() {
            return getDelegate().get().map(this::filteredByZone);
        }
    
        //通过 zoneConfig 过滤
        private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
            if (zone == null) {
                zone = zoneConfig.getZone();
            }
            if (zone != null) {
                List<ServiceInstance> filteredInstances = new ArrayList<>();
                for (ServiceInstance serviceInstance : serviceInstances) {
                    String instanceZone = getZone(serviceInstance);
                    if (zone.equalsIgnoreCase(instanceZone)) {
                        filteredInstances.add(serviceInstance);
                    }
                }
                if (filteredInstances.size() > 0) {
                    return filteredInstances;
                }
            }
            /**
             * @see ZonePreferenceServiceInstanceListSupplier 在没有相同zone实例的时候返回的是所有实例
             * 我们这里为了实现不同 zone 之间不互相调用需要返回空列表
             */
            return List.of();
        }
    
        //读取实例的 zone,没有配置则为 null
        private String getZone(ServiceInstance serviceInstance) {
            Map<String, String> metadata = serviceInstance.getMetadata();
            if (metadata != null) {
                return metadata.get(ZONE);
            }
            return null;
        }
    }
    
    image

    在之前章节的讲述中,我们提到了我们使用 spring-cloud-sleuth 作为链路追踪库。我们想可以通过其中的 traceId,来区分究竟是否是同一个请求。

    RoundRobinWithRequestSeparatedPositionLoadBalancer

    //一定必须是实现ReactorServiceInstanceLoadBalancer
    //而不是ReactorLoadBalancer<ServiceInstance>
    //因为注册的时候是ReactorServiceInstanceLoadBalancer
    @Log4j2
    public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
        private final ServiceInstanceListSupplier serviceInstanceListSupplier;
        //每次请求算上重试不会超过1分钟
        //对于超过1分钟的,这种请求肯定比较重,不应该重试
        private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
                //随机初始值,防止每次都是从第一个开始调用
                .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
        private final String serviceId;
        private final Tracer tracer;
    
    
        public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
            this.serviceInstanceListSupplier = serviceInstanceListSupplier;
            this.serviceId = serviceId;
            this.tracer = tracer;
        }
    
        @Override
        public Mono<Response<ServiceInstance>> choose(Request request) {
            return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
        }
    
        private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
            if (serviceInstances.isEmpty()) {
                log.warn("No servers available for service: " + this.serviceId);
                return new EmptyResponse();
            }
            return getInstanceResponseByRoundRobin(serviceInstances);
        }
    
        private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
            if (serviceInstances.isEmpty()) {
                log.warn("No servers available for service: " + this.serviceId);
                return new EmptyResponse();
            }
            //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
            Span currentSpan = tracer.currentSpan();
            if (currentSpan == null) {
                currentSpan = tracer.newTrace();
            }
            long l = currentSpan.context().traceId();
            AtomicInteger seed = positionCache.get(l);
            int s = seed.getAndIncrement();
            int pos = s % serviceInstances.size();
            log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
            return new DefaultResponse(serviceInstances.stream()
                    //实例返回列表顺序可能不同,为了保持一致,先排序再取
                    .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                    .collect(Collectors.toList()).get(pos));
        }
    }
    
    image

    在上一节,我们提到了可以通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:

    spring.factories

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration
    

    然后编写这个自动配置类,其实很简单,就是添加一个 @LoadBalancerClients 注解,设置默认配置类:

    LoadBalancerAutoConfiguration

    @Configuration(proxyBeanMethods = false)
    @LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
    public class LoadBalancerAutoConfiguration {
    }
    

    编写这个默认配置类,将上面我们实现的两个类,组装进去:

    DefaultLoadBalancerConfiguration

    @Configuration(proxyBeanMethods = false)
    public class DefaultLoadBalancerConfiguration {
    
        @Bean
        public ServiceInstanceListSupplier serviceInstanceListSupplier(
                DiscoveryClient discoveryClient,
                Environment env,
                ConfigurableApplicationContext context,
                LoadBalancerZoneConfig zoneConfig
        ) {
            ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                    .getBeanProvider(LoadBalancerCacheManager.class);
            return  //开启服务实例缓存
                    new CachingServiceInstanceListSupplier(
                            //只能返回同一个 zone 的服务实例
                            new SameZoneOnlyServiceInstanceListSupplier(
                                    //启用通过 discoveryClient 的服务发现
                                    new DiscoveryClientServiceInstanceListSupplier(
                                            discoveryClient, env
                                    ),
                                    zoneConfig
                            )
                            , cacheManagerProvider.getIfAvailable()
                    );
        }
    
        @Bean
        public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
                Environment environment,
                ServiceInstanceListSupplier serviceInstanceListSupplier,
                Tracer tracer
        ) {
            String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            return new RoundRobinWithRequestSeparatedPositionLoadBalancer(
                    serviceInstanceListSupplier,
                    name,
                    tracer
            );
        }
    }
    

    这样,我们就实现了自定义的负载均衡器。也理解了 Spring Cloud LoadBalancer 的使用。

    image

    我们这一节详细分析在我们项目中使用 Spring Cloud LoadBalancer 要实现的功能,实现了自定义的负载均衡器,也理解了 Spring Cloud LoadBalancer 的使用。下一节我们使用单元测试验证我们要实现的这些功能是否有效。

    相关文章

      网友评论

          本文标题:SpringCloud升级之路2020.0.x版-23.订制Sp

          本文链接:https://www.haomeiwen.com/subject/vrcbiltx.html