在前面已经知道,其实服务列表是可以通过各种方式加载,比如静态的---配置类,或者是写死的集合,动态的---通过注册中心发现服务,而实现这些功能的类是ServerList。至于控制调度的则是ServerListUpdater。这些服务列表,最后会被转化成Server进行存储。
那么存储到哪里?以及如何被管理
那么下面说到的类,ILoadBalancer就是用来管理Server的。
-
1 结构
结构
结构其实就比较简单,中间我省略了一些其他的接口,不过实现类就这几个。
- 2.每个类的作用
- 2.1 ILoadBalancer
ILoadBalancer是最顶层的接口,所以看看该接口的定义。
public interface ILoadBalancer {
添加服务列表
public void addServers(List<Server> newServers);
通过key选择服务
public Server chooseServer(Object key);
将某个服务标记为宕机
public void markServerDown(Server server);
是否只选择存活的服务
@Deprecated
public List<Server> getServerList(boolean availableOnly);
获取存活的服务列表
public List<Server> getReachableServers();
获取所有服务列表
public List<Server> getAllServers();
}
从这个接口实现来看,其实是对Server的管理。涵盖了新增,查询,以及修改。至于为什么没有删除,因为ServerListUpdater会定时去更新整个服务列表。
- 2.2 AbstractLoadBalancer
新增了俩个抽象方法
getServerList(ServerGroup serverGroup)--通过服务状态查询服务列表
getLoadBalancerStats()--获取负载均衡的统计信息
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
public Server chooseServer() {
return chooseServer(null);
}
public abstract List<Server> getServerList(ServerGroup serverGroup);
public abstract LoadBalancerStats getLoadBalancerStats();
}
- 2.3 BaseLoadBalancer
BaseLoadBalancer是基本的实现类,把AbstractLoadBalancer 剩余的方法都实现了。 - 2.3.1 BaseLoadBalancer对于ILoadBalancer接口的实现。
上面还有PrimeConnectionListener以及IClientConfigAware这2个接口,后续再来仔细看看这2个接口的作用。
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
这里为什么用线程安全的类,以及可见性,因为本身ServerListUpdater的执行是通过线程池来操作。
可见性-ServerListUpdater会对allServerList 进行重新赋值。
synchronizedList-保证对allServerList 集合中的操作都是线程安全的。
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
记录存活的服务列表
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
关于allServerList 的读写锁。因为是对集合内部的元素属性做调整,所以需要加锁。
本身线程安全的集合只是针对集合本身,与元素的修改无关,所以需要加锁。
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
Ping用于检测服务是否存活
protected IPing ping = null;
添加服务列表
-----------------------------------------------------------------------------------------------------------------------------
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
public void setServersList(List lsrv) {
Lock writeLock = allServerLock.writeLock();
ArrayList<Server> newServers = new ArrayList<Server>();
1.上锁
writeLock.lock();
try {
2.将lsrv遍历,判断类型是否正确,最终转换为Server的集合
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
3.判断是否与原先的服务列表不同,从而决定是否进行修改
list的equals方法除了比较长度,还会比较元素的内容
if (!allServerList.equals(allServers)) {
listChanged = true;
4.在服务列表发生变化的情况下,通过监听器去执行相关操作。
后续再细讲。---
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
打印日志。。。省略
}
}
}
}
5.检测服务列表的是否准备好服务,其实就是访问服务的真实Ip地址。就看能不能ping的通。
如果可以访问server.setReadyToServe(true);
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
6. 替换原先的allServerList
allServerList = allServers;
7.检测服务是否存活,如果跳过,默认都存活,否则通过Ping这个类,来检测服务是否存活。
如果存活就会setAlive(true)
通过更新upServerList,记录存活的服务列表
if (canSkipPing()) {
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
} else if (listChanged) {
forceQuickPing();
}
} finally {
writeLock.unlock();
}
}
private boolean canSkipPing() {
if (ping == null
|| ping.getClass().getName().equals(DummyPing.class.getName())) {
// default ping, no need to set up timer
return true;
} else {
return false;
}
}
-------------------------------------------------------------------------------------------------------------------------
选择服务
从代码来看,选择服务是通过Rule来进行选择的。同时在选择服务的时候,计数器会计数。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
-----------------------------------------------------------------------------------------------------------------------------
将服务标记为宕机
private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
server.setAlive(false);
通过监听器去做些什么~暂时没发现什么实现。
notifyServerStatusChangeListener(singleton(server));
}
}
- 2.4 DynamicServerListLoadBalancer
DynamicServerListLoadBalancer的大多数实现还是基于BaseLoadBalancer。
但是与BaseLoadBalancer不同,DynamicServerListLoadBalancer通过DynamicServerList动态的获取服务列表。此外还会对服务列表进行过滤。
至于获取服务列表的调度则由ServerListUpdater控制。
源码如下
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
更新服务列表的标记
protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
protected void updateAllServerList(List<T> ls) {
由于在线程池中运行,避免资源浪费,以及资源竞争,用cas避免线程安全问题。
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true);
}
对服务列表进行管理---核心
setServersList(ls);
更新服务的存活状态
super.forceQuickPing();
} finally {
修改表计,改为暂停
serverListUpdateInProgress.set(false);
}
}
}
}
上面可以看到,首先DynamicServerListLoadBalancer是动态的获取服务列表。其次,也支持对服务列表的过滤。那么在对于一些基础实现上是否存在不同。比如对于服务列表的管理。那么接下来继续看源码。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
@Override
public void setServersList(List lsrv) {
这里是先调用了BaseLoadBalancer的setServersList方法。
实现了服务列表基本的存储,以及服务状态更新。
super.setServersList(lsrv);
List<T> serverList = (List<T>) lsrv;
用于存储将服务列表分区后的数据
Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
遍历,对每个服务进行处理,设置分区信息
for (Server server : serverList) {
生成该服务的统计信息,利用了CacheLoader
getLoadBalancerStats().getSingleServerStat(server);
默认都是"defaultZone"
String zone = server.getZone();
if (zone != null) {
zone = zone.toLowerCase();
List<Server> servers = serversInZones.get(zone);
if (servers == null) {
servers = new ArrayList<Server>();
serversInZones.put(zone, servers);
}
servers.add(server);
}
}
生成区域与服务列表的映射,存放在LoadBalancerStats中。
setServerListForZones(serversInZones);
}
protected void setServerListForZones(
Map<String, List<Server>> zoneServersMap) {
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}
}
从上面这段代码来看,其实就是简单做了一个分区,将服务列表分区之后,进行存储。
但是存储的内容如何被利用?在ZoneAwareLoadBalancer中就可以知道了。
- 2.5 ZoneAwareLoadBalancer
其实从这个负载均衡器的名字来看,其实就是跟分区相关的负载均衡器。
核心功能就是选择合适的分区(根据各个分区的统计指标),然后选择对应的负载均衡器,去选择对应的服务。
源码如下
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
用于存储分区与负载均衡器的映射
private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
---------------------------------------------------------------------------------------------------------------------------------
对服务列表的处理
1.服务列表分区后,存储到LoadBalancerStats
2.给分区分配负载均衡器,同时给负载均衡器分配规则
覆写父类DynamicServerListLoadBalancer的setServerListForZones方法。
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
调回DynamicServerListLoadBalancer的setServerListForZones方法
目的就是将zoneServersMap存储到LoadBalancerStats中,对分区信息进行存储。
super.setServerListForZones(zoneServersMap);
一开始balancers 只会为null。
if (balancers == null) {
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
给对应的分区分配负载均衡器
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
分配负载均衡器,同时对服务列表进行存储
getLoadBalancer(zone).setServersList(entry.getValue());
}
如果新的分区列表中,不包含原先的分区,则将该分区对应的服务列表清空
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
如果该区域没有负载均衡器,则重新分配,类型为BaseLoadBalancer
if (loadBalancer == null) {
分配规则---这个规则其实就是筛选服务的规则
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
-------------------------------------------------------------------------------------------------------------------------------
那么如何筛选?
1.如果存活的分区小1的情况下,会直接使用规则筛选出服务。
2.利用统计数据,筛选出合适的区域,最后在该区域的服务列表进行筛选。
@Override
public Server chooseServer(Object key) {
分区为小于1的情况下,直接用规则进行筛选。
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
选出合适的区域,通过该区域对于的负载均衡器去选择服务。
该区域的服务列表,存储在该负载均衡器
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
}
流程图如下
服务列表的存储流程
网友评论