仅consumer端会有cluster层的处理,在RegistryProtocol.refer()
返回使用RegistryDirectory对象(见注册中心)初始化的cluster层Invoker,返回给配置解析层,交给spring管理。
3.1 路由实现
- 条件路由
基于url属性信息作为条件进行provider路由选择 - 脚本路由
⽀持 JDK 脚本引擎的脚本所实现的路由算法 - 文件路由
文件保存脚本程序,实际转换成脚本路由实现
3.2 动态配置
3.3 负载均衡
除一致性hash策略外其他负载均衡策略都基于provider权重.获取权重代码如下,其中:
- WARMUP_KEY表示是否支持预热,获取的是服务预热时间,默认预热时间10分钟。
- 服务上线时间超过预热时间则使用配置的接口权重。
- 服务上线时间小于预热时间,则接口权重=(上线时间/预热时间)*权重。 最小为1,最大为配置的接口权重。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
3.3.1 最少活跃
- ActiveLimitFilter消费端过滤器统计并更新RpcStatus接口信息。
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
获取provider的活跃调用数。 - 取最少活跃数的provider。
- 如有多个相同最少活跃数,则按照3.3.2随机策略选择provider。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // The least active value of all invokers
int leastCount = 0; // The number of invokers having the same least active value (leastActive)
int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
int totalWeight = 0; // The sum of with warmup weights
int firstWeight = 0; // Initial value, used for comparision
boolean sameWeight = true; // Every invoker has the same weight value?
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int afterWarmup = getWeight(invoker, invocation); // Weight
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexs[0]);
}
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offsetWeight = random.nextInt(totalWeight) + 1;
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
3.3.2 轮询
每个invoker有一个固定权重值,和一个选择权重值。每次invoker取选择权重最大的invoker。
- 若落选,则选择权重+=固定权重。
- 若选中,则选择权重-=所有invoker固定权重总和。
即权重高的会更快的优先被选中。
3.3.3 随机
- 若权重相同则随机选取。
- 若权重不同,则以总权重值为区间随机选一个数,前n个provider权重大于随机值,则选择第n个provider
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
3.3.4 一致性hash
- 以treemap存储hash值
- 每个provider节点分解成默认160个虚拟节点,计算hash值后存储到treemap中
- treemap.tailMap()获取目标hash的后续节点,存在则选择第一个。不存在则选择整个treemap的第一个。
3.4 失败策略
策略 | 说明 |
---|---|
FailoverClusterInvoker | 失败换provider重试 |
FailFastClusterInvoker | 快速失败,只尝试一次 |
FailSafeClusterInvoker | 失败忽略,纪录日志 |
FailBackClusterInvoker | 失败重试,重走provider list,负载均衡筛选 |
BroadcastClusterInvoker | 广播,一个失败则失败 |
ForkClusterInvoker | 多个同时发,一个成功则成功 |
MergeClusterInvoker | 多个同时发,获取结果后merge返回 |
AvailableClusterInvoker | 第一个有效的provider |
网友评论