主要特性
限流,熔断,降级,监控等功能
image.png
基本使用
QPS(Queries-per-second):每秒的请求次数
资源:接口和方法
快速入门
整体思路是类似于锁,若成功获取资源,则执行下面的逻辑,若没有则抛出阻塞异常。
https://github.com/alibaba/Sentinel/wiki/%E6%96%B0%E6%89%8B%E6%8C%87%E5%8D%97#%E5%85%AC%E7%BD%91-demo
控制台
案例代码
@Configuration
public class AopConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect(){
return new SentinelResourceAspect();
}
}
@RestController
public class SentinelController {
@SentinelResource(value = "sayHello") //针对方法级别的限流
@GetMapping("/say")
public String sayHello(){
System.out.println("hello world");
return "hello world";
}
}
@SpringBootApplication
public class SentinelDemoApplication {
public static void main(String[] args) {
initFlowRules();
SpringApplication.run(SentinelDemoApplication.class, args);
}
//初始化规则
private static void initFlowRules(){
List<FlowRule> rules=new ArrayList<>(); //限流规则的集合
FlowRule flowRule=new FlowRule();
flowRule.setResource("sayHello");//资源(方法名称、接口)
flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); //限流的阈值的类型
flowRule.setCount(10);
rules.add(flowRule);
FlowRuleManager.loadRules(rules);
}
}
运行结果
image.png
实现原理
链路如何添加的
责任链模式
image.png
image.png
源码分析
1 com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build
通过spi来实现责任链的获取
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless.
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
2 com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#addLast
通过addLast构建上下级关系,第一个end是一个抽象内部类,只是起一个开头的作用。
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
image.png
如何实现限流
滑动窗口(StatisticSlot)
image.png
sampleCount:表示1ms中滑动窗口的总个数
intervalInMs:表示把1ms分成多少份
windowLengthInMs:表示1个活动窗口占多少份
源码分析
1 com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);//滑动窗口
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
}
2 com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
// 910/200=4
int idx = calculateTimeIdx(timeMillis);
// 第五个窗口的开始时间:800
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
规则如何判断的
FlowSlot
源码分析
1 com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
网友评论