从前面可以知道,ribbon会通过统计数据,找到合适的分区,再找到分区对应负载均衡器,通过负载均衡器中的服务列表,对服务进行匹配,最后找到对应的服务。
那么下面先来说说统计数据
-
1统计数据
-
1.1统计数据的结构
image.png
-
1.1.1 DistributionMBean
这个类的源码比较简单,如下
public interface DistributionMBean {
void clear();清理分布数据,重置到初始状态
long getNumValues();数据的个数
double getMean();平均值
double getVariance();方差
double getStdDev();标准差
double getMinimum();最小值
double getMaximum();最大值
}
- 1.1.2 DataCollector
这个类的源码也比较简单
public interface DataCollector {
void noteValue(double val);
}
这个类的接口比较简单,主要是用于数据采集。
- 1.1.3 Distribution
分布数据的基本实现
源码如下
public class Distribution implements DistributionMBean, DataCollector {
private long numValues;值的数量
private double sumValues;值的总和
private double sumSquareValues;值的平方总和
private double minValue;值的最小值
private double maxValue;值的最大值
初始化
public Distribution() {
numValues = 0L;
sumValues = 0.0;
sumSquareValues = 0.0;
minValue = 0.0;
maxValue = 0.0;
}
进行值的存储
public void noteValue(double val) {
numValues++;
sumValues += val;
sumSquareValues += val * val;
if (numValues == 1) {
minValue = val;
maxValue = val;
} else if (val < minValue) {
minValue = val;
} else if (val > maxValue) {
maxValue = val;
}
}
清除数据
public void clear() {
numValues = 0L;
sumValues = 0.0;
sumSquareValues = 0.0;
minValue = 0.0;
maxValue = 0.0;
}
获取值的数量
public long getNumValues() {
return numValues;
}
获取平均值
public double getMean() {
if (numValues < 1) {
return 0.0;
} else {
return sumValues / numValues;
}
}
方差
public double getVariance() {
if (numValues < 2) {
return 0.0;
} else if (sumValues == 0.0) {
return 0.0;
} else {
double mean = getMean();
return (sumSquareValues / numValues) - mean * mean;
}
}
标准差
public double getStdDev() {
return Math.sqrt(getVariance());
}
获取最大值
public double getMinimum() {
return minValue;
}
获取最小值
public double getMaximum() {
return maxValue;
}
合并另外一个分布数据
public void add(Distribution anotherDistribution) {
if (anotherDistribution != null) {
numValues += anotherDistribution.numValues;
sumValues += anotherDistribution.sumValues;
sumSquareValues += anotherDistribution.sumSquareValues;
minValue = (minValue < anotherDistribution.minValue) ? minValue
: anotherDistribution.minValue;
maxValue = (maxValue > anotherDistribution.maxValue) ? maxValue
: anotherDistribution.maxValue;
}
}
}
- 1.1.4 DataBuffer
dataBuffer是用于存储数据的,从名字看,数据缓冲区。
源码也比较简单。代码如下
public class DataBuffer extends Distribution {
private final Lock lock;锁
private final double[] buf;缓冲区
private long startMillis;开始时间
private long endMillis;结束时间
private int size;数据量的个数
private int insertPos;插入的位置
构造方法,初始化,容量默认为1000
public DataBuffer(int capacity) {
lock = new ReentrantLock();
buf = new double[capacity];
startMillis = 0;
size = 0;
insertPos = 0;
}
public Lock getLock() {
return lock;
}
public int getCapacity() {
return buf.length;
}
public long getSampleIntervalMillis() {
return (endMillis - startMillis);
}
public int getSampleSize() {
return size;
}
清除缓冲区
@Override
public void clear() {
同时将统计数据清除
super.clear();
startMillis = 0;
size = 0;
insertPos = 0;
}
开始收集
public void startCollection() {
清除数据,初始化
clear();
startMillis = System.currentTimeMillis();
}
结束收集
public void endCollection() {
endMillis = System.currentTimeMillis();
Arrays.sort(buf, 0, size);
}
记录数据
@Override
public void noteValue(double val) {
父类中有一些指标,需要记录,如最大值,最小值,总平方,值的数量,总值等等
super.noteValue(val);
位置后移,保存到数组中
buf[insertPos++] = val;
如果超过缓存区长度,从头开始覆盖,size变成缓存区长度,从头开始覆盖
if (insertPos >= buf.length) {
insertPos = 0;
size = buf.length;
} else if (insertPos > size) {
size = insertPos;
}
}
取百分之a,到百分之b范围内的缓冲数据
public double[] getPercentiles(double[] percents, double[] percentiles) {
for (int i = 0; i < percents.length; i++) {
percentiles[i] = computePercentile(percents[i]);
}
return percentiles;
}
private double computePercentile(double percent) {
if (size <= 0) {
return 0.0;
} else if (percent <= 0.0) {
return buf[0];
} else if (percent >= 100.0) {
return buf[size - 1];
}
double index = (percent / 100.0) * size;
int iLow = (int) Math.floor(index);
int iHigh = (int) Math.ceil(index);
assert 0 <= iLow && iLow <= index && index <= iHigh && iHigh <= size;
assert (iHigh - iLow) <= 1;
if (iHigh >= size) {
return buf[size - 1];
} else if (iLow == iHigh) {
return buf[iLow];
} else {
return buf[iLow] + (index - iLow) * (buf[iHigh] - buf[iLow]);
}
}
}
- 1.1.15 DataAccumulator
数据积累器, 其实就是数据采集器的一个基本实现。
public abstract class DataAccumulator implements DataCollector {
private DataBuffer current;当前采集的数据
private DataBuffer previous;之前采集的数据
private final Object swapLock = new Object();交换锁
初始化构造方法
public DataAccumulator(int bufferSize) {
this.current = new DataBuffer(bufferSize);
this.previous = new DataBuffer(bufferSize);
}
采集数据,保存到当前的缓冲区中
public void noteValue(double val) {
synchronized (swapLock) {
Lock l = current.getLock();
l.lock();
try {
current.noteValue(val);
} finally {
l.unlock();
}
}
}
推送数据
public void publish() {
DataBuffer tmp = null;
Lock l = null;
synchronized (swapLock) {
将当前的数据,保存到previous
tmp = current;
current = previous;
previous = tmp;
l = current.getLock();
l.lock();
try {
清除current的数据,开始收集---这里感觉是比较聪明的做法,不用来回new。
current.startCollection();
} finally {
l.unlock();
}
l = tmp.getLock();
l.lock();
}
try {
结束收集
tmp.endCollection();
推送当前统计数据
publish(tmp);
} finally {
l.unlock();
}
}
暴露给子类去实现的一个推送数据接口,DataDistribution进行了覆写,
protected abstract void publish(DataBuffer buf);
}
- 1.1.16 DataDistribution
这个类其实与DataBuffer相似,但是从结构上来说,还实现了数据采集器的功能。
public class DataDistribution extends DataAccumulator implements DataDistributionMBean {
private long numValues = 0L;总数据样本数(有效+无效,超出缓存区的就是无效)
private double mean = 0.0;平均值
private double variance = 0.0;方差
private double stddev = 0.0;标准差
private double min = 0.0;最小值
private double max = 0.0;最大值
private long ts = 0L;
private long interval = 0L;间隔
private int size = 0;有效样品数
private final double[] percents;
private final double[] percentiles;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP2")
public DataDistribution(int bufferSize, double[] percents) {
初始化父类,DataAccumulator的缓存区(this.current,this.previous)
super(bufferSize);
assert percentsOK(percents);
this.percents = percents;
this.percentiles = new double[percents.length];
}
private static boolean percentsOK(double[] percents) {
if (percents == null) {
return false;
}
for (int i = 0; i < percents.length; i++) {
if (percents[i] < 0.0 || percents[i] > 100.0) { // SUPPRESS CHECKSTYLE MagicNumber
return false;
}
}
return true;
}
protected void publish(DataBuffer buf) {
将统计数据赋值到成员变量中
ts = System.currentTimeMillis();
numValues = buf.getNumValues();
mean = buf.getMean();
variance = buf.getVariance();
stddev = buf.getStdDev();
min = buf.getMinimum();
max = buf.getMaximum();
interval = buf.getSampleIntervalMillis();
size = buf.getSampleSize();
buf.getPercentiles(percents, percentiles);
}
/** {@inheritDoc} */
public void clear() {
numValues = 0L;
mean = 0.0;
variance = 0.0;
stddev = 0.0;
min = 0.0;
max = 0.0;
ts = 0L;
interval = 0L;
size = 0;
for (int i = 0; i < percentiles.length; i++) {
percentiles[i] = 0.0;
}
}
public long getNumValues() {
return numValues;
}
public double getMean() {
return mean;
}
public double getVariance() {
return variance;
}
/** {@inheritDoc} */
public double getStdDev() {
return stddev;
}
public double getMinimum() {
return min;
}
public double getMaximum() {
return max;
}
public String getTimestamp() {
return new Date(getTimestampMillis()).toString();
}
public long getTimestampMillis() {
return ts;
}
public long getSampleIntervalMillis() {
return interval;
}
public int getSampleSize() {
return size;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP")
public double[] getPercents() {
return percents;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EI_EXPOSE_REP")
public double[] getPercentiles() {
return percentiles;
}
}
- 1.1.17 DataPublisher
前面的一些类,都是用于数据存储。那么哪个类调度数据的更新?
public class DataPublisher {
private static final String THREAD_NAME = "DataPublisher";
private static final boolean DAEMON_THREADS = true;守护线程
private static ScheduledExecutorService sharedExecutor = null;周期性线程池
private final DataAccumulator accumulator;数据采集器
private final long delayMillis;延时时间
private Future<?> future = null;回调
构造方法,初始化
public DataPublisher(DataAccumulator accumulator,
long delayMillis) {
this.accumulator = accumulator;
this.delayMillis = delayMillis;
}
判断是否在运行,通过查看回调是否为空
public synchronized boolean isRunning() {
return (future != null);
}
启动方法
public synchronized void start() {
if (future == null) {
Runnable task = new Runnable() {
public void run() {
try {
accumulator.publish();
} catch (Exception e) {
handleException(e);
}
}
};
通过周期性线程池去定时更新采集的数据
future = getExecutor().scheduleWithFixedDelay(task,
delayMillis, delayMillis,
TimeUnit.MILLISECONDS);
}
}
protected synchronized ScheduledExecutorService getExecutor() {
if (sharedExecutor == null) {
sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory());
}
return sharedExecutor;
}
private static final class PublishThreadFactory implements ThreadFactory {
PublishThreadFactory() { }
public Thread newThread(Runnable r) {
Thread t = new Thread(r, THREAD_NAME);
t.setDaemon(DAEMON_THREADS);
return t;
}
}
public synchronized void stop() {
if (future != null) {
future.cancel(false);
future = null;
}
}
protected void handleException(Exception e) {
}
}
- 1.1.18 ServerStats
用于统计负载均衡器中的各种统计信息
下面下来看看这个类的结构。
public class ServerStats {
private static final int DEFAULT_PUBLISH_INTERVAL = 60 * 1000; // = 1 minute 更新统计信息的间隔
private static final int DEFAULT_BUFFER_SIZE = 60 * 1000; // = 1000 requests/sec for 1 minute 每一秒存储的数据为长度=1000的数组
获取百分比数组-正态分布的有效百分比
private static final double[] PERCENTS = makePercentValues();
private static double[] makePercentValues() {
Percent[] percents = Percent.values();
double[] p = new double[percents.length];
for (int i = 0; i < percents.length; i++) {
p[i] = percents[i].getValue();
}
return p;
}
private static enum Percent {
TEN(10), TWENTY_FIVE(25), FIFTY(50), SEVENTY_FIVE(75), NINETY(90),
NINETY_FIVE(95), NINETY_EIGHT(98), NINETY_NINE(99), NINETY_NINE_POINT_FIVE(99.5);
private double val;
Percent(double val) {
this.val = val;
}
public double getValue() {
return val;
}
}
总请求数
AtomicLong totalRequests = new AtomicLong();
请求总数自增
public void incrementNumRequests(){
totalRequests.incrementAndGet();
}
dataDist用于采集响应数据,并且运算
private DataDistribution dataDist = new DataDistribution(1, PERCENTS);
responseTimeDist只是简单保存响应数据
private final Distribution responseTimeDist = new Distribution();
记录响应数据
public void noteResponseTime(double msecs){
dataDist.noteValue(msecs);
responseTimeDist.noteValue(msecs);
}
总的活跃请求数
AtomicInteger activeRequestsCount = new AtomicInteger(0);
上次活跃的时间
private volatile long lastActiveRequestsCountChangeTimestamp;
private MeasuredRate requestCountInWindow = new MeasuredRate(300000L);
最近有活跃请求的时间
private volatile long lastActiveRequestsCountChangeTimestamp;
最后访问的时间戳
private volatile long lastAccessedTimestamp;
第一次连接的时间戳
private volatile long firstConnectionTimestamp = 0;
活跃请求数的增加
public void incrementActiveRequestsCount() {
activeRequestsCount.incrementAndGet();
用于统计单位时间内的计数
requestCountInWindow.increment();
long currentTime = System.currentTimeMillis();
lastActiveRequestsCountChangeTimestamp = currentTime;
lastAccessedTimestamp = currentTime;
if (firstConnectionTimestamp == 0) {
firstConnectionTimestamp = currentTime;
}
}
活跃请求数的减少
public void decrementActiveRequestsCount() {
if (activeRequestsCount.decrementAndGet() < 0) {
activeRequestsCount.set(0);
}
lastActiveRequestsCountChangeTimestamp = System.currentTimeMillis();
}
统计300秒内的计数
private MeasuredRate requestCountInWindow = new MeasuredRate(300000L);
统计1秒内的失败请求数量
long failureCountSlidingWindowInterval = 1000;
private MeasuredRate serverFailureCounts = new MeasuredRate(failureCountSlidingWindowInterval);
public void addToFailureCount(){
serverFailureCounts.increment();
}
public long getFailureCount(){
return serverFailureCounts.getCurrentCount();
}
Server server;
初始化,开始数据采集
public void initialize(Server server) {
serverFailureCounts = new MeasuredRate(failureCountSlidingWindowInterval);
统计300秒内的计数
requestCountInWindow = new MeasuredRate(300000L);
if (publisher == null) {
dataDist = new DataDistribution(getBufferSize(), PERCENTS);
publisher = new DataPublisher(dataDist, getPublishIntervalMillis());
开始数据采集
publisher.start();
}
封装对应的服务信息
this.server = server;
}
}
- 1.1.19 MeasuredRate
用于统计单位时间内的数
public class MeasuredRate {
private final AtomicLong _lastBucket = new AtomicLong(0);
private final AtomicLong _currentBucket = new AtomicLong(0);
private final long _sampleInterval;
private volatile long _threshold;
public MeasuredRate(long sampleInterval){
_sampleInterval = sampleInterval;
_threshold = System.currentTimeMillis() + sampleInterval;
}
public long getCount() {
checkAndResetWindow();
return _lastBucket.get();
}
public long getCurrentCount() {
checkAndResetWindow();
return _currentBucket.get();
}
public void increment() {
checkAndResetWindow();
_currentBucket.incrementAndGet();
}
private void checkAndResetWindow() {
long now = System.currentTimeMillis();
if(_threshold < now) {
_lastBucket.set(_currentBucket.get());
_currentBucket.set(0);
_threshold = now + _sampleInterval;
}
}
public String toString(){
StringBuilder sb = new StringBuilder();
sb.append("count:" + getCount());
sb.append("currentCount:" + getCurrentCount());
return sb.toString();
}
}
- 2.1 编写一个测试方法,看看这个类如何使用
这里直接使用源码中的方法
public class ServerStatsTest {
public static void main(String args[]) {
ServerStats ss = new ServerStats();
//这里设置bufferSize的大小为1000的数组,记录1000个数据
ss.setBufferSize(1000);
//更新间隔为1秒
ss.setPublishInterval(1000);
//这里设置服务
ss.initialize(new Server("stonse", 80));
Random r = new Random(1459834);
模拟100个请求,最后输出平均时长以及其他统计数据
for (int i=0; i < 99; i++){
double rl = r.nextDouble() * 25.2;
ss.noteResponseTime(rl);
ss.incrementNumRequests();
try {
Thread.sleep(100);
System.out.println("ServerStats:avg:" + ss.getResponseTimeAvg());
System.out.println("ServerStats:90 percentile:" + ss.getResponseTime90thPercentile());
System.out.println("ServerStats:90 percentile:" + ss.getResponseTimePercentileNumValues());
} catch (InterruptedException e) {
}
}
System.out.println("done ---");
ss.close();
System.out.println("ServerStats:" + ss);
}
}
最后输出结果如下
ServerStats:avg:13.275308970139484
ServerStats:90 percentile:24.29368204858261
ServerStats:90 percentile:10
ServerStats:[Server:stonse:80; Zone:UNKNOWN; Total Requests:99; Successive connection failure:0; Total blackout seconds:0; Last connection made:Thu Jan 01 08:00:00 CST 1970; First connection made: Thu Jan 01 08:00:00 CST 1970; Active Connections:0; total failure count in last (1000) msecs:0; average resp time:13.275308970139484; 90 percentile resp time:24.29368204858261; 95 percentile resp time:24.29368204858261; min resp time:0.021071074712381233; max resp time:24.60484318031518; stddev resp time:7.121850590230442]
这个类其实就是简单的用于统计的。
可以算出平均响应时长,以及总活跃的请求数,等等相关指标,用于衡量服务的健康状态。
- 3 服务相关的统计
- 3.1 LoadBalancerStats
这个类其实是用来存储服务对应的统计信息,说白了就是对于服务统计信息的一个增删改查。
那么这些统计信息主要是用于负载均衡算法,因为往往这些指标,可以体现一个服务的一个健康状况。
public class LoadBalancerStats implements IClientConfigAware {
分区与统计信息的映射
volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
分区对应的服务列表
volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
public LoadBalancerStats(){
zoneStatsMap = new ConcurrentHashMap<String, ZoneStats>();
upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
}
缓存-这是guaua提供的一个缓存,具有数据定时失效的功能
private final LoadingCache<Server, ServerStats> serverStatsCache =
CacheBuilder.newBuilder()
.expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES)
.removalListener(new RemovalListener<Server, ServerStats>() {
@Override
public void onRemoval(RemovalNotification<Server, ServerStats> notification) {
notification.getValue().close();
}
})
.build(
new CacheLoader<Server, ServerStats>() {
public ServerStats load(Server server) {
return createServerStats(server);
}
});
创建服务的统计信息
protected ServerStats createServerStats(Server server) {
ServerStats ss = new ServerStats(this);
ss.setBufferSize(1000);
ss.setPublishInterval(1000);
ss.initialize(server);
return ss;
}
新增某个服务的统计信息
public void addServer(Server server) {
try {
serverStatsCache.get(server);
} catch (ExecutionException e) {
ServerStats stats = createServerStats(server);
serverStatsCache.asMap().putIfAbsent(server, stats);
}
}
获取某个服务的统计信息
protected ServerStats getServerStats(Server server) {
try {
return serverStatsCache.get(server);
} catch (ExecutionException e) {
ServerStats stats = createServerStats(server);
serverStatsCache.asMap().putIfAbsent(server, stats);
return serverStatsCache.asMap().get(server);
}
}
某个服务对应的统计信息-增加总的活跃请求次数
public void incrementActiveRequestsCount(Server server) {
ServerStats ss = getServerStats(server);
ss.incrementActiveRequestsCount();
}
某个服务对应的统计信息-减少总的活跃请求次数
public void decrementActiveRequestsCount(Server server) {
ServerStats ss = getServerStats(server);
ss.decrementActiveRequestsCount();
}
}
- 3.2 ZoneStats
分区的统计信息
其实说到底这个类,就是用于生成分区统计信息的。至于如何生成,其实还是依赖LoadBalancerStats 。
因为本身LoadBalancerStats 就有所有服务的统计信息。而分区统计信息的生成,其实就是将服务按分区聚合,然后根据分区对应的服务列表的统计信息,去生成。
而分区快照ZoneSnapshot,其实就是用于存储ZoneStats生成的分区统计信息的。
先看看用什么类存储
public class ZoneSnapshot {
final int instanceCount;
final double loadPerServer;
final int circuitTrippedCount;
final int activeRequestsCount;
public ZoneSnapshot() {
this(0, 0, 0, 0d);
}
public ZoneSnapshot(int instanceCount, int circuitTrippedCount, int activeRequestsCount, double loadPerServer) {
this.instanceCount = instanceCount;
this.loadPerServer = loadPerServer;
this.circuitTrippedCount = circuitTrippedCount;
this.activeRequestsCount = activeRequestsCount;
}
public final int getInstanceCount() {
return instanceCount;
}
public final double getLoadPerServer() {
return loadPerServer;
}
public final int getCircuitTrippedCount() {
return circuitTrippedCount;
}
public final int getActiveRequestsCount() {
return activeRequestsCount;
}
@Override
public String toString() {
return "ZoneSnapshot [instanceCount=" + instanceCount
+ ", loadPerServer=" + loadPerServer + ", circuitTrippedCount="
+ circuitTrippedCount + ", activeRequestsCount="
+ activeRequestsCount + "]";
}
}
下面再来看看ZoneStats如何生成分区统计信息
public class ZoneStats<T extends Server> {
private final LoadBalancerStats loadBalancerStats;统计信息(这个类中包含了所有服务的统计信息,以及分区对应服务列表)
private final String zone;分区名字
private static final String PREFIX = "ZoneStats_";前缀
private final Counter counter;
final String monitorId;
public ZoneStats(String name, String zone, LoadBalancerStats loadBalancerStats) {
this.zone = zone;
this.loadBalancerStats = loadBalancerStats;
monitorId = name + ":" + zone;
counter = Monitors.newCounter(PREFIX + name + "_" + zone + "_Counter");
Monitors.registerObject(monitorId, this);
}
public final String getZone() {
return zone;
}
有相关获取某个指标的方法,但是其实实现都是一样的。
@Monitor(name=PREFIX + "ActiveRequestsCount", type = DataSourceType.INFORMATIONAL)
public int getActiveRequestsCount() {
return loadBalancerStats.getActiveRequestsCount(zone);
}
}
以获取分区的总活跃请求数为例子。
public class LoadBalancerStats implements IClientConfigAware {
分区与服务列表的映射
volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<String, List<? extends Server>>();
public int getActiveRequestsCount(String zone) {
return getZoneSnapshot(zone).getActiveRequestsCount();
}
获取分区的快照
public ZoneSnapshot getZoneSnapshot(String zone) {
if (zone == null) {
return new ZoneSnapshot();
}
zone = zone.toLowerCase();
获取分区对应的服务列表
List<? extends Server> currentList = upServerListZoneMap.get(zone);
return getZoneSnapshot(currentList);
}
public ZoneSnapshot getZoneSnapshot(List<? extends Server> servers) {
if (servers == null || servers.size() == 0) {
return new ZoneSnapshot();
}
int instanceCount = servers.size();
int activeConnectionsCount = 0;
int activeConnectionsCountOnAvailableServer = 0;
int circuitBreakerTrippedCount = 0;
double loadPerServer = 0;
long currentTime = System.currentTimeMillis();
将每个服务的统计信息进行累加即可。
for (Server server: servers) {
ServerStats stat = getSingleServerStat(server);
if (stat.isCircuitBreakerTripped(currentTime)) {
circuitBreakerTrippedCount++;
} else {
activeConnectionsCountOnAvailableServer += stat.getActiveRequestsCount(currentTime);
}
activeConnectionsCount += stat.getActiveRequestsCount(currentTime);
}
if (circuitBreakerTrippedCount == instanceCount) {
if (instanceCount > 0) {
// should be NaN, but may not be displayable on Epic
loadPerServer = -1;
}
} else {
loadPerServer = ((double) activeConnectionsCountOnAvailableServer) / (instanceCount - circuitBreakerTrippedCount);
}
生成快照
return new ZoneSnapshot(instanceCount, circuitBreakerTrippedCount, activeConnectionsCount, loadPerServer);
}
}
- 4 如何利用统计数据
统计数据的生成先跳过,先来说说负载均衡器如何使用?
会利用到分区数据的只有ZoneAwareLoadBalancer,会根据分区的统计数据进行服务地址的分配。
源码如下
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
@Override
public Server chooseServer(Object key) {
如果分区只有一个,那么简单的利用rule进行服务的筛选即可
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
return super.chooseServer(key);
}
Server server = null;
try {
获取所有服务的负载均衡信息
LoadBalancerStats lbStats = getLoadBalancerStats();
创建分区的统计信息
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
。。。省略部分代码
通过rule来挑选出合适的分区
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
如果分区存在多个,则随机选取一个
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
获取该分区对应的负载均衡器
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
通过负载均衡器来选择服务,最后也是通过rule来选择服务
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
}
网友评论