美文网首页
(七)对服务列表存储以及更新(ILoadBalancer)

(七)对服务列表存储以及更新(ILoadBalancer)

作者: guessguess | 来源:发表于2021-11-21 11:12 被阅读0次

    在前面已经知道,其实服务列表是可以通过各种方式加载,比如静态的---配置类,或者是写死的集合,动态的---通过注册中心发现服务,而实现这些功能的类是ServerList。至于控制调度的则是ServerListUpdater。这些服务列表,最后会被转化成Server进行存储。
    那么存储到哪里?以及如何被管理
    那么下面说到的类,ILoadBalancer就是用来管理Server的。

    • 1 结构


      结构

      结构其实就比较简单,中间我省略了一些其他的接口,不过实现类就这几个。

    • 2.每个类的作用
    • 2.1 ILoadBalancer
      ILoadBalancer是最顶层的接口,所以看看该接口的定义。
    public interface ILoadBalancer {
        添加服务列表
        public void addServers(List<Server> newServers);
        通过key选择服务
        public Server chooseServer(Object key);
        将某个服务标记为宕机
        public void markServerDown(Server server);
        是否只选择存活的服务
        @Deprecated
        public List<Server> getServerList(boolean availableOnly);
        获取存活的服务列表
        public List<Server> getReachableServers();
        获取所有服务列表
        public List<Server> getAllServers();
    }
    

    从这个接口实现来看,其实是对Server的管理。涵盖了新增,查询,以及修改。至于为什么没有删除,因为ServerListUpdater会定时去更新整个服务列表。

    • 2.2 AbstractLoadBalancer
      新增了俩个抽象方法
      getServerList(ServerGroup serverGroup)--通过服务状态查询服务列表
      getLoadBalancerStats()--获取负载均衡的统计信息
    public abstract class AbstractLoadBalancer implements ILoadBalancer {
        public enum ServerGroup{
            ALL,
            STATUS_UP,
            STATUS_NOT_UP        
        }
        public Server chooseServer() {
            return chooseServer(null);
        }
        public abstract List<Server> getServerList(ServerGroup serverGroup);
        public abstract LoadBalancerStats getLoadBalancerStats();    
    }
    
    • 2.3 BaseLoadBalancer
      BaseLoadBalancer是基本的实现类,把AbstractLoadBalancer 剩余的方法都实现了。
    • 2.3.1 BaseLoadBalancer对于ILoadBalancer接口的实现。
      上面还有PrimeConnectionListener以及IClientConfigAware这2个接口,后续再来仔细看看这2个接口的作用。
    public class BaseLoadBalancer extends AbstractLoadBalancer implements
            PrimeConnections.PrimeConnectionListener, IClientConfigAware {
        这里为什么用线程安全的类,以及可见性,因为本身ServerListUpdater的执行是通过线程池来操作。
        可见性-ServerListUpdater会对allServerList 进行重新赋值。
        synchronizedList-保证对allServerList 集合中的操作都是线程安全的。
        protected volatile List<Server> allServerList = Collections
                .synchronizedList(new ArrayList<Server>());
        
        记录存活的服务列表
        protected volatile List<Server> upServerList = Collections
                .synchronizedList(new ArrayList<Server>());
    
        关于allServerList 的读写锁。因为是对集合内部的元素属性做调整,所以需要加锁。
        本身线程安全的集合只是针对集合本身,与元素的修改无关,所以需要加锁。
        protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
        Ping用于检测服务是否存活
        protected IPing ping = null;
    
    添加服务列表
    -----------------------------------------------------------------------------------------------------------------------------
        @Override
        public void addServers(List<Server> newServers) {
            if (newServers != null && newServers.size() > 0) {
                try {
                    ArrayList<Server> newList = new ArrayList<Server>();
                    newList.addAll(allServerList);
                    newList.addAll(newServers);
                    setServersList(newList);
                } catch (Exception e) {
                    logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
                }
            }
        }
    
    
        public void setServersList(List lsrv) {
            Lock writeLock = allServerLock.writeLock();        
            ArrayList<Server> newServers = new ArrayList<Server>();
            1.上锁
            writeLock.lock();
            try {
                2.将lsrv遍历,判断类型是否正确,最终转换为Server的集合
                ArrayList<Server> allServers = new ArrayList<Server>();
                for (Object server : lsrv) {
                    if (server == null) {
                        continue;
                    }
                    if (server instanceof String) {
                        server = new Server((String) server);
                    }
                    if (server instanceof Server) {
                        allServers.add((Server) server);
                    } else {
                        throw new IllegalArgumentException(
                                "Type String or Server expected, instead found:"
                                        + server.getClass());
                    }
                }
                boolean listChanged = false;
                3.判断是否与原先的服务列表不同,从而决定是否进行修改
                list的equals方法除了比较长度,还会比较元素的内容
                if (!allServerList.equals(allServers)) {
                    listChanged = true;
                    4.在服务列表发生变化的情况下,通过监听器去执行相关操作。
                       后续再细讲。---
                    if (changeListeners != null && changeListeners.size() > 0) {
                       List<Server> oldList = ImmutableList.copyOf(allServerList);
                       List<Server> newList = ImmutableList.copyOf(allServers);                   
                       for (ServerListChangeListener l: changeListeners) {
                           try {
                               l.serverListChanged(oldList, newList);
                           } catch (Exception e) {
                               打印日志。。。省略
                           }
                       }
                    }
                }
                5.检测服务列表的是否准备好服务,其实就是访问服务的真实Ip地址。就看能不能ping的通。
                  如果可以访问server.setReadyToServe(true);
                if (isEnablePrimingConnections()) {
                    for (Server server : allServers) {
                        if (!allServerList.contains(server)) {
                            server.setReadyToServe(false);
                            newServers.add((Server) server);
                        }
                    }
                    if (primeConnections != null) {
                        primeConnections.primeConnectionsAsync(newServers, this);
                    }
                }
                6. 替换原先的allServerList 
                allServerList = allServers;
                7.检测服务是否存活,如果跳过,默认都存活,否则通过Ping这个类,来检测服务是否存活。
                   如果存活就会setAlive(true)
                   通过更新upServerList,记录存活的服务列表
                if (canSkipPing()) {
                    for (Server s : allServerList) {
                        s.setAlive(true);
                    }
                    upServerList = allServerList;
                } else if (listChanged) {
                    forceQuickPing();
                }
            } finally {
                writeLock.unlock();
            }
        }
        
        private boolean canSkipPing() {
            if (ping == null
                    || ping.getClass().getName().equals(DummyPing.class.getName())) {
                // default ping, no need to set up timer
                return true;
            } else {
                return false;
            }
        }
    -------------------------------------------------------------------------------------------------------------------------
    选择服务
        从代码来看,选择服务是通过Rule来进行选择的。同时在选择服务的时候,计数器会计数。
        protected IRule rule = DEFAULT_RULE;
        private final static IRule DEFAULT_RULE = new RoundRobinRule();
        public Server chooseServer(Object key) {
            if (counter == null) {
                counter = createCounter();
            }
            counter.increment();
            if (rule == null) {
                return null;
            } else {
                try {
                    return rule.choose(key);
                } catch (Exception e) {
                    logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                    return null;
                }
            }
        }
    -----------------------------------------------------------------------------------------------------------------------------
    将服务标记为宕机
    
        private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
    
        public void markServerDown(Server server) {
            if (server == null || !server.isAlive()) {
                return;
            }
            server.setAlive(false);
            通过监听器去做些什么~暂时没发现什么实现。
            notifyServerStatusChangeListener(singleton(server));
        }
    }
    
    • 2.4 DynamicServerListLoadBalancer
      DynamicServerListLoadBalancer的大多数实现还是基于BaseLoadBalancer。
      但是与BaseLoadBalancer不同,DynamicServerListLoadBalancer通过DynamicServerList动态的获取服务列表。此外还会对服务列表进行过滤。
      至于获取服务列表的调度则由ServerListUpdater控制。
      源码如下
    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
            @Override
            public void doUpdate() {
                updateListOfServers();
            }
        };
        更新服务列表的标记
        protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
        public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
                servers = serverListImpl.getUpdatedListOfServers();
                LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
    
                if (filter != null) {
                    servers = filter.getFilteredListOfServers(servers);
                    LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                            getIdentifier(), servers);
                }
            }
            updateAllServerList(servers);
        }
    
        protected void updateAllServerList(List<T> ls) {
            由于在线程池中运行,避免资源浪费,以及资源竞争,用cas避免线程安全问题。
            if (serverListUpdateInProgress.compareAndSet(false, true)) {
                try {
                    for (T s : ls) {
                        s.setAlive(true); 
                    }
                    对服务列表进行管理---核心
                    setServersList(ls);
                   更新服务的存活状态
                    super.forceQuickPing();
                } finally {
                    修改表计,改为暂停
                    serverListUpdateInProgress.set(false);
                }
            }
        }
    
    }
    

    上面可以看到,首先DynamicServerListLoadBalancer是动态的获取服务列表。其次,也支持对服务列表的过滤。那么在对于一些基础实现上是否存在不同。比如对于服务列表的管理。那么接下来继续看源码。

    public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        @Override
        public void setServersList(List lsrv) {
            这里是先调用了BaseLoadBalancer的setServersList方法。
            实现了服务列表基本的存储,以及服务状态更新。
            super.setServersList(lsrv);
            List<T> serverList = (List<T>) lsrv;
            用于存储将服务列表分区后的数据
            Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
            遍历,对每个服务进行处理,设置分区信息
            for (Server server : serverList) {
                生成该服务的统计信息,利用了CacheLoader
                getLoadBalancerStats().getSingleServerStat(server);
                默认都是"defaultZone"
                String zone = server.getZone();
                if (zone != null) {
                    zone = zone.toLowerCase();
                    List<Server> servers = serversInZones.get(zone);
                    if (servers == null) {
                        servers = new ArrayList<Server>();
                        serversInZones.put(zone, servers);
                    }
                    servers.add(server);
                }
            }
            生成区域与服务列表的映射,存放在LoadBalancerStats中。
            setServerListForZones(serversInZones);
        }
    
        protected void setServerListForZones(
                Map<String, List<Server>> zoneServersMap) {
            getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
        }
    }
    

    从上面这段代码来看,其实就是简单做了一个分区,将服务列表分区之后,进行存储。
    但是存储的内容如何被利用?在ZoneAwareLoadBalancer中就可以知道了。

    • 2.5 ZoneAwareLoadBalancer
      其实从这个负载均衡器的名字来看,其实就是跟分区相关的负载均衡器。
      核心功能就是选择合适的分区(根据各个分区的统计指标),然后选择对应的负载均衡器,去选择对应的服务。
      源码如下
    public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
        用于存储分区与负载均衡器的映射
        private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
    ---------------------------------------------------------------------------------------------------------------------------------
    对服务列表的处理
            1.服务列表分区后,存储到LoadBalancerStats
            2.给分区分配负载均衡器,同时给负载均衡器分配规则
    
        覆写父类DynamicServerListLoadBalancer的setServerListForZones方法。
        @Override
        protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
            调回DynamicServerListLoadBalancer的setServerListForZones方法
            目的就是将zoneServersMap存储到LoadBalancerStats中,对分区信息进行存储。
            super.setServerListForZones(zoneServersMap);
            一开始balancers 只会为null。
            if (balancers == null) {
                balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
            }
            给对应的分区分配负载均衡器
            for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
                String zone = entry.getKey().toLowerCase();
                分配负载均衡器,同时对服务列表进行存储
                getLoadBalancer(zone).setServersList(entry.getValue());
            }
            如果新的分区列表中,不包含原先的分区,则将该分区对应的服务列表清空
            for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
                if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                    existingLBEntry.getValue().setServersList(Collections.emptyList());
                }
            }
        }
    
        BaseLoadBalancer getLoadBalancer(String zone) {
            zone = zone.toLowerCase();
            BaseLoadBalancer loadBalancer = balancers.get(zone);
            如果该区域没有负载均衡器,则重新分配,类型为BaseLoadBalancer
            if (loadBalancer == null) {
                分配规则---这个规则其实就是筛选服务的规则
                IRule rule = cloneRule(this.getRule());
                loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
                BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
                if (prev != null) {
                    loadBalancer = prev;
                }
            } 
            return loadBalancer;        
        }
    -------------------------------------------------------------------------------------------------------------------------------
    那么如何筛选?
           1.如果存活的分区小1的情况下,会直接使用规则筛选出服务。
           2.利用统计数据,筛选出合适的区域,最后在该区域的服务列表进行筛选。
        @Override
        public Server chooseServer(Object key) {
            分区为小于1的情况下,直接用规则进行筛选。
            if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
                logger.debug("Zone aware logic disabled or there is only one zone");
                return super.chooseServer(key);
            }
            Server server = null;
            try {
                LoadBalancerStats lbStats = getLoadBalancerStats();
                Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
                logger.debug("Zone snapshots: {}", zoneSnapshot);
                if (triggeringLoad == null) {
                    triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
                }
    
                if (triggeringBlackoutPercentage == null) {
                    triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                            "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
                }
                Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
                if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                    String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                    if (zone != null) {
                        BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                        选出合适的区域,通过该区域对于的负载均衡器去选择服务。
                        该区域的服务列表,存储在该负载均衡器
                        server = zoneLoadBalancer.chooseServer(key);
                    }
                }
            } catch (Exception e) {
                logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
            }
            if (server != null) {
                return server;
            } else {
                logger.debug("Zone avoidance logic is not invoked.");
                return super.chooseServer(key);
            }
        }
    
    }
    

    流程图如下


    服务列表的存储流程

    相关文章

      网友评论

          本文标题:(七)对服务列表存储以及更新(ILoadBalancer)

          本文链接:https://www.haomeiwen.com/subject/bqfktrtx.html