美文网首页Spring Cloud 架构微服务架构和实践
自定义负载均衡算法实现环境隔离

自定义负载均衡算法实现环境隔离

作者: 罗曼蒂克 | 来源:发表于2019-08-05 21:03 被阅读32次

实际开发过程中的问题:

公共环境中多个人注册在同一个 eureka 集群中,根据负载均衡原则,消费者会获取所有的服务提供者对其中一个发起调用,某用户本来想只调用自己的提供者进行 debug,结果调用窜到了其他人注册的提供者上,影响开发效率.

约定名词:

  1. routingTag: 路由键,根据该值决定请求发送到哪个服务提供者分片.
  2. 分片: 具有相同 routingKe的一组服务,包括提供者和消费者,目的就是想做到消费者和提供者在同一个分片.

方案一

image.png

优点:方便简单

缺点: 1. 每次本地调试都有改动,一旦提交,代码合并冲突,对开发者不友好

方案二

自己定制开发负载均衡策略来实现

优点: 不用改动

缺点: 需要自己开发

为了长远的效率考虑选择方案二

思路
  1. 提供者在配置文件注明自己的分片,消费者也标明自己的分片,
  2. 获取所有提供者的信息,根据分片信息进行分组过滤,获取到同一个分片的一组提供者,对其发起调用

Eureka 已经提供了metadata-map 来自定义元数据:

eureka.instance.metadata-map

实现

  1. 在服务的消费者和提供者的元信息中定义 eureka.instance.metadata-map.routingTag=ABC
    PS 这个ABC可以设置成从机器获得唯一标识。${}
  2. 仿照 com.netflix.loadbalancer.RandomRule继承AbstractLoadBalancerRule
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.ReflectUtil;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.groupingBy;

public class TagRule extends AbstractLoadBalancerRule {
    @Autowired
    DiscoveryClient discoveryClient;
    @Value("${eureka.instance.metadata-map.routingTag}")
    String routingTag;


    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {

    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
    }

    @Override
    public Server choose(Object key) {
        ILoadBalancer loadBalancer = getLoadBalancer();
        if (loadBalancer == null) {
            return null;
        }
        Server resultServer = null;

        while (resultServer == null) {
            if (Thread.interrupted()) {
                return null;
            }
            //获取所有服务提供者信息
            List<Server> allServers = loadBalancer.getAllServers();
            if (allServers.size() == 0) {
                return null;
            } else {
                resultServer = allServers.get(0);
                //获取提供者的实例信息
                List<ServiceInstance> instances = discoveryClient.getInstances(resultServer.getMetaInfo().getServiceIdForDiscovery());
                //根据实例中 metadata.routingTag进行分组
                Map<String, List<ServiceInstance>> collect = instances.stream().collect(groupingBy(s -> s.getMetadata().get("routingTag")));
                //获取与当前消费者在同一个分片的服务生产者 by routingTag
                List<ServiceInstance> serviceInstances = collect.get(routingTag);
                List<Server> sameTagServers = new ArrayList<>();
                for (ServiceInstance serviceInstance : serviceInstances) {
                    for (Server server : allServers) {
                        InstanceInfo instanceInfo = (InstanceInfo) ReflectUtil.getFieldValue(server, "instanceInfo");
                        if (instanceInfo.getInstanceId().equalsIgnoreCase(((EurekaDiscoveryClient.EurekaServiceInstance) serviceInstance).getInstanceInfo().getInstanceId())) {
                            sameTagServers.add(server);
                        }
                    }
                }
                //随机选出一个
                if (CollUtil.isNotEmpty(sameTagServers)) {
                    resultServer = sameTagServers.get(RandomUtil.randomInt(0,sameTagServers.size()));
                }

            }
            if (resultServer == null) {
                /*
                 * The only time this should happen is if the server list were
                 * somehow trimmed. This is a transient condition. Retry after
                 * yielding.
                 */
                Thread.yield();
                continue;
            }

            if (resultServer.isAlive()) {
                return (resultServer);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            resultServer = null;
            Thread.yield();
        }
        System.out.println("选中的服务是:" + resultServer);
        return resultServer;
    }

}


  1. 配置中启用该 rule

    @Bean
    @Scope("prototype")//这个注解不能少,否则快速调用几次就会出现com.netflix.client.ClientException: Load balancer does not have available server for client 异常
    public IRule tagRule() {
        return new TagRule();
    }

  1. 测试,服务提供者配置一个和消费者相同的routingTag,服务提供者再配置多个不同的进行验证
image.png
image.png

如图:
user是服务提供者,
message 是服务消费者.
8000是 eureka
9001的 routingTag 是 ABC
9999和 9998 的 routingTag 是 www
8002的 routingTag 是 www
8001是 为未启用 TagRule
效果如下:tab1 是 8002 端口,tab2 是 8001 端口

load-balancer.gif

不完善的地方就是同一个分片中的服务是随机选择的,不过一般用于调试也不会在本地启用很多实例来搞自己.
下一篇解决一下 MQ 乱窜的问题.

相关文章

网友评论

    本文标题:自定义负载均衡算法实现环境隔离

    本文链接:https://www.haomeiwen.com/subject/zylvdctx.html