在前面,负载均衡器里面对于服务的一个选择,其实也是通过规则来控制的。
那么在ribbon里面,也有一些相关的规则。
整体的结构如下。
rule
- 1 各个Rule的实现
- 1.1 IRule接口
public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
* server is available
*/
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
接口方法比较简单,根据key过滤服务,以及负载均衡器的set&Get
- 1.2 AbstractLoadBalancerRule
一个比较简单的抽象类,实现了负载均衡器的set&get
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
- 1.3 RoundRobinRule
通过轮询算法,来选择服务
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
模拟IncreAndGet-轮询算法
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
通过轮询算法选取下标,选择某个服务
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
}
从源码的实现上来看,也是相对比较简单的,用轮询算法来选择某个服务。
- 1.4 ClientConfigEnabledRoundRobinRule
这个类其实是注入客户端配置的一个轮询规则
实现比较简单
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {
RoundRobinRule roundRobinRule = new RoundRobinRule();
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}
@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
通过轮询规则来选择服务
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}
}
- 1.5 BestAvailableRule
找到活跃请求数最少的服务,某种意义上来说,活跃请求数意味着这个服务的压力最小(但是也要看这个机器的性能)
源码比较简单
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
通过服务的统计数据,找到活跃请求数最小的服务
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}
- 1.6 PredicateBasedRule
这个类其实是一个抽象类,更多是作为一个提供拓展的类。
通过Predicate提供对服务的过滤。
那么先看看Predicate这个类。由于还是有部分实现类的,所以只看接口功能即可。
public interface Predicate<T> {
boolean apply(@Nullable T input);
@Override
boolean equals(@Nullable Object object);
}
Predicate这个类功能其实比较简单,说白了就是用于判断。
那么再来看看PredicateBasedRule
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
public abstract AbstractServerPredicate getPredicate();
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
通过Predicate来过滤服务
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
- 1.7 ZoneAvoidanceRule
这个rule其实主要是用于选择合适的分区。
结合了轮询算法以及统计信息。
那么如何选择合适的分区,源码如下
public class ZoneAvoidanceRule extends PredicateBasedRule {
该方法的实现核心就是随机选出一个服务。
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
如果分区只有一个,直接返回
if (availableZones.size() == 1) {
return availableZones;
}
最坏的分区集合
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
这里的实现核心其实就是通过统计信息,找出坏的分区,然后移除
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
return availableZones;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
}
- 2 总结一下
ribbon中的rule分别具备以下作用:
RoundRobinRule---轮询的负载均衡
BestAvailableRule---找出活跃请求最小的服务(当前压力最小)
ZoneAvoidanceRule---结合分区统计信息筛选出合适的分区(目前我还是没弄清这些统计数据是如何使用的,等后面再回头看吧)
PredicateBasedRule---具备过滤的功能
网友评论