1. 测试用例
我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:
DemoController:
@RestController
public class DemoController {
@Autowired
private TestService service;
@GetMapping("/foo")
public String apiFoo(@RequestParam(required = false) Long t) throws Exception {
if (t == null) {
t = System.currentTimeMillis();
}
service.test();
return service.hello(t);
}
@GetMapping("/baz/{name}")
public String apiBaz(@PathVariable("name") String name) {
return service.helloAnother(name);
}
}
TestServiceImpl:
@Service
public class TestServiceImpl implements TestService {
@Override
@SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class})
public void test() {
System.out.println("Test");
}
@Override
@SentinelResource(value = "hello", fallback = "helloFallback")
public String hello(long s) {
if (s < 0) {
throw new IllegalArgumentException("invalid arg");
}
return String.format("Hello at %d", s);
}
@Override
@SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback",
exceptionsToIgnore = {IllegalStateException.class})
public String helloAnother(String name) {
if (name == null || "bad".equals(name)) {
throw new IllegalArgumentException("oops");
}
if ("foo".equals(name)) {
throw new IllegalStateException("oops");
}
return "Hello, " + name;
}
public String helloFallback(long s, Throwable ex) {
// Do some log here.
ex.printStackTrace();
return "Oops, error occurred at " + s;
}
public String defaultFallback() {
System.out.println("Go to default fallback");
return "default_fallback";
}
}
启动类DemoApplication
:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
在启动这个工程上增加参数:
-Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj
如图:
打开http://localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:
1.1 流控测试
访问 http://localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控
页面可以看到:
- 其中,资源名为:
然后我们在快速刷新http://localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:
Oops, error occurred at 188
实时监控为:
2. 注解版源码分析
使用注解@SentinelResource
核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect
类,这是一个切面类:
@Aspect // 切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 指定切入点为@SentinelResource 注解
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
// 环绕通知
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
// 要织入的,增强的功能
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 调用目标方法
return pjp.proceed();
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
核心方法SphU.entry()
:
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
// 注意 第4个参数值为 1
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
throws BlockException {
// count 参数:表示当前请求可以增加多少个计数
// 注意 第5个参数为false
return entryWithType(name, resourceType, entryType, count, false, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
// 将信息封装为一个资源对象
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
// 返回一个资源操作对象entry
// prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过
// prioritized 为 false 则当前请求无需等待
return entryWithPriority(resource, count, prioritized, args);
}
我们重点看一下CtSph#entryWithPriority
:
/**
* @param resourceWrapper
* @param count 默认为1
* @param prioritized 默认为false
* @param args
* @return
* @throws BlockException
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 从ThreadLocal中获取Context
// 一个请求会占用一个线程,一个线程会绑定一个context
Context context = ContextUtil.getContext();
// 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值
// 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
// 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal
if (context == null) {
// todo Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
// 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// todo 查找SlotChain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
// 若没有知道chain,则意味着chain数量超出了阈值
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 创建一个资源操作对象
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// todo 对资源进行操作
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
2.1 默认Context创建
当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal
。核心方法为 InternalContextUtil.internalEnter
:
public static Context enter(String name, String origin) {
if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
throw new ContextNameDefineException(
"The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
}
return trueEnter(name, origin);
}
protected static Context trueEnter(String name, String origin) {
// 尝试从ThreadLocal中获取context
Context context = contextHolder.get();
// 若Threadlocal中没有,则尝试从缓存map中获取
if (context == null) {
// 缓存map的key为context名称,value为EntranceNode
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// DCL 双重检测锁,防止并发创建对象
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
// 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 创建一个EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
// 将新建的node添加到Root
Constants.ROOT.addChild(node);
// 将新建的node写入到缓存map
// 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
// 将context的name与entranceNode 封装成context
context = new Context(node, name);
// 初始化context的来源
context.setOrigin(origin);
// 将context写入到ThreadLocal
contextHolder.set(context);
}
return context;
}
注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。
2.2 查找并创建SlotChain
构建调用链lookProcessChain(resourceWrapper)
:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 缓存map的key为资源 value为其相关的SlotChain
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
// DCL
// 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
// 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// todo 创建新的chain
chain = SlotChainProvider.newSlotChain();
// 防止 迭代稳定性问题
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
我们直接看核心方法SlotChainProvider.newSlotChain();
:
public static ProcessorSlotChain newSlotChain() {
// 若builder不为null,则直接使用builder构建一个chain
// 否则先创建一个builder
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
// 通过SPI方式创建builder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
// 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
// todo 构建一个chain
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
2.2.1 创建slotChainBuilder
// 通过SPI方式创建builder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
通过SPI方法创建slotChainBuilder
,去项目中META-INF.service
中获取:
2.2.2 slotChainBuilder.build()
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 通过SPI方式构建Slot
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
通过SPI机制,去项目中META-INF.service
中获取,在sentinel-core
项目中:
还有一个ParamFlowSlot
,在sentinel-extension/sentinel-parameter-flow-control
下:
我们点击 NodeSelectorSlot
, 类上面是有 优先级order,数字越小,优先级越高。
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
优先级常量为:
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEGRADE_SLOT = -1000;
我们看代码中的变量sortedSlotList
,已经按照优先级排序好了:
我们看一下构建的ProcessorSlotChain
,类似一个单链表结构,如下:
我们看一下相关的类结构:DefaultProcessorSlotChain
:
// 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
}
AbstractLinkedProcessorSlot
:
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// 声明一个同类型的变量,则可以指向下一个Slot节点
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
构建完成后的SlotChain
和工作原理图一样:
网友评论