上篇文章讲了流控规则,而除了流控规则之后还有降级、热点、系统、授权等规则,这篇文件主要讲降级规则。
降级规则主要处理节点是DegradeSlot,其中具体逻辑由DegradeRuleManager.checkDegrade
实现
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
if (degradeRules == null) {
return;
}
List<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
获取所有的降级规则,进行一个个的校验,校验逻辑是由DegradeRule
实现,这里和流控规则FlowRule类似,先看下内部属性
public class DegradeRule extends AbstractRule {
//
private static final int RT_MAX_EXCEED_N = 5;
private double count;
private int timeWindow;
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private volatile boolean cut = false;
private AtomicLong passCount = new AtomicLong(0);
- RT_MAX_EXCEED_N:在降级策略为RT的情况下,如果连续RT_MAX_EXCEED_N个请求都大于配置的值,那么会在窗口时间内会进行降级状态,所有流量都会返回false(抛出 DegradeException);在降级策略为异常比例的情况下,总qps且异常数大于该值才会进行异常比例的判断
- count:降级策略为RT则表示响应时间;降级策略为异常比例则表示异常比例;降级策略为异常数则表示异常数量
- timeWindow:降级的时间窗口,在该窗口时间内请求都不能通过
- grade:降级熔断策略
- cut:是否被降级熔断,如果true,则请求过来直接拒绝
- passCount:降级策略为RT的时候用来统计超过配置值的数量
接下来看下DegradeRule
的处理
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//是否降级
if (cut) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//省略降级策略的处理....
// 到达这里表示触发了降级规则,需要降级熔断
// 这里用锁是防止多线程更新cut,导致重复创建了ResetTask
synchronized (lock) {
if (!cut) {// 如果没有降级熔断,则需要设置为true
// Automatically degrade.
cut = true;
// 创建一个延时任务,在时间窗口过后将cut设为false,将passCount设为0
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
// 返回false表示当前操作失败
return false;
}
}
接下来看下具体策略的处理
降级策略:RT
double rt = clusterNode.avgRt();
//从node中获取平均rt
if (rt < this.count) {// 如果小于配置的值,则可以直接返回成功
// 将passCount重置
passCount.set(0);
return true;
}
// 到达这里表示当前请求rt已经超过阈值,是否返回失败需要判断passCount是否大于等于RT_MAX_EXCEED_N
// 递增passCount的值,然后判断是否大于RT_MAX_EXCEED_N
// 如果小于RT_MAX_EXCEED_N那么还是返回成功
// 直到连续超过阈值RT_MAX_EXCEED_N次才返回失败
if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
return true;
}
这种情况下需要注意一种情况:
假设接口平均rt很小,但是某一次请求时间大幅度的上升,这样会导致整个接口的rt大幅度上升,这样会导致异常降级,例如某个接口平均rt为1ms,配置的阈值为10ms,例如某一次请求rt达到了1s,导致整个接口的平均rt到了100ms,那么就会导致错误降级熔断
请求量小的接口可能会出现上述情况,如qps只有10,某一次接口达到了1s会导致整个接口平均rt上升到100ms左右
降级策略:失败比例
// 异常qps
double exception = clusterNode.exceptionQps();
// 成功qps
double success = clusterNode.successQps();
// 总qps=passQps+blockQps
long total = clusterNode.totalQps();
// 总qps小于RT_MAX_EXCEED_N则无视
if (total < RT_MAX_EXCEED_N) {
return true;
}
double realSuccess = success - exception;
// 失败数小于RT_MAX_EXCEED_N且成功数小于0的情况则无视
if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {
return true;
}
// 异常比例判断
if (exception / success < count) {
return true;
}
注意:
-
clusterNode.successQps()
返回的是成功执行完了Slot链且没有被规则拦截的数量 -
clusterNode.exceptionQps()
返回的是基于1的基础且业务处理中出现异常的数量,该需要需要用Tracer.trace(t)
捕获,才会计入统计 - 由12可知,
clusterNode.successQps()
包含了clusterNode.exceptionQps()
,所以realSuccess需要减去重合的部分才是真正成功的数量
降级策略:异常数
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
异常数这个规则比较简单,就是判断一分钟内的异常数是否大于阈值。这里还有个注意点:在时间窗口小于60s的时候,会导致降级熔断时间窗口过后,还是会被降级熔断,是因为这里是判断的一分钟的异常数,时间窗口太小会导致恢复熔断后,异常数还是大于等于阈值。
测试代码如下(在官方提供的ExceptionCountDegradeDemo基础上修改),先配置一个规则
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// set limit exception count to 4
rule.setCount(4);
rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);
/**
* When degrading by {@link RuleConstant#DEGRADE_GRADE_EXCEPTION_COUNT}, time window
* less than 60 seconds will not work as expected. Because the exception count is
* summed by minute, when a short time window elapsed, the degradation condition
* may still be satisfied.
*/
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
}
配置一个降级规则,策略是异常数,数量为4,熔断时间窗口是10s(代码中的注释是官方提交的,从这里也看出降级熔断窗口太小是会有问题的)
运行代码如下:
private static final String KEY = "abc";
private static AtomicInteger total = new AtomicInteger();
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger bizException = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 1;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
initDegradeRule();
// 运行10次,每次都抛出异常
for (int i = 0; i < 10; i++) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
pass.addAndGet(1);
throw new RuntimeException("throw runtime ");
} catch (BlockException e) {
block.addAndGet(1);
} catch (Throwable t) {
bizException.incrementAndGet();
Tracer.trace(t);
} finally {
total.addAndGet(1);
if (entry != null) {
entry.exit();
}
}
}
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get() + ", bizException:" + bizException.get());
// 上面运行后,会被降级熔断,窗口时间为10s,这里睡眠11s,等待窗口时间过去
Thread.sleep(11000);
// 继续执行
Entry entry = null;
try {
entry = SphU.entry(KEY);
pass.addAndGet(1);
} catch (BlockException e) {
block.addAndGet(1);
} catch (Throwable t) {
bizException.incrementAndGet();
Tracer.trace(t);
} finally {
total.addAndGet(1);
if (entry != null) {
entry.exit();
}
}
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get() + ", bizException:" + bizException.get());
}
上面代码中,第一次for循环执行10次逻辑,每次都抛出异常,并且用Tracer.trace记录我们的业务异常,由于配置的异常数为4,所以执行第四次结果过后,就已经被降级熔断了,打印的结果如下:
total:10, pass:4, block:6, bizException:4
可以看到后面6次被block了,即被降级规则降级熔断了,此时sleep11s,这个时候窗口时间已经过了,但是执行后续代码发现输出如下:
total:11, pass:4, block:7, bizException:4
即这次请求也被block了,因为恢复之后异常数还是4,仍然不符合exception < count
的判断,这时如果将sleep的时间设置成60s,输出如下
total:11, pass:5, block:6, bizException:4
这时候,就正常了,因为统计的时间窗口已经往后移动了,统计的原理需要了解一下sentinel的滑动时间窗口的原理
网友评论