Hystrix的基本使用
public class HelloCommand extends HystrixCommand<String> {
public HelloCommand(Setter setter) {
super(setter);
}
@Override
protected String run() throws Exception {
return "success";
}
@Override
protected String getFallback() {
return "fail";
}
}
public class HelloCommandTest() {
@Test
public void test() {
String THREAD_POOL_PREFIX = "thread-";
String groupKey = "groud-100";
int timeoutMilliseconds = 2000;
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(groupKey))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(THREAD_POOL_PREFIX + groupKey))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeoutMilliseconds))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10)./*withMaxQueueSize(90).*/withQueueSizeRejectionThreshold(1000)
// 每次调用execute方法都需要创建一个新的HelloCommand实例,因为每个实例都有自己的状态
/*if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}*/
HelloCommand command = new HelloCommand(setter);
String result = command.execute();
}
}
线程隔离原理
存在全局的线程池线程池缓存,根据threadPoolKey来区分不同的线程池,每个threadPoolKey一个线程池。
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<>();
// com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
所以要实现线程隔离的关键在于threadPoolKey,一个threadPoolKey映射一个线程池。
这里探讨两种线程隔离的方式:类级别和实例级别,重点讲讲实例级别的线程隔离。
类级别线程隔离
最简单的方法是实现很多的HystrixCommand<T>
,如上面代码里的HelloCommand
,你还可以有ACommand
,BCommand
,CCommand
,DCommand
……这样每个类都会有自己的线程池,从而实现线程隔离。
原理(可略过,不影响全文阅读)
每个类在实例化的时候,如果没有指定线程池(通过Setter
指定),默认会使用组名作为一组线程池,组名必须设置,因为构造Setter实例的唯一方法是withGroupKey
。
// com.netflix.hystrix.HystrixCommand.Setter#withGroupKey
public static Setter withGroupKey(HystrixCommandGroupKey groupKey) {
return new Setter(groupKey);
}
// com.netflix.hystrix.AbstractCommand#initThreadPoolKey
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
if (threadPoolKeyOverride == null) {
// we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
if (threadPoolKey == null) {
/* use HystrixCommandGroup if HystrixThreadPoolKey is null */
// 没有设置threadPoolKey使用groupKey
return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
} else {
return threadPoolKey;
}
} else {
// we have a property defining the thread-pool so use it instead
return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
}
}
实例级别线程隔离
但其实很多时候你并不希望有这么多的类,你可能希望在HelloCommand#run
中根据不同的请求系统进行线程隔离,比如:HelloCommand#run
根据不同的请求参数,会访问系统A、系统B、系统C、系统D……你不希望系统A出现问题影响到代码对其他系统的访问。
这个时候你的代码可能是这样的:
public class Bean {
// 需要请求的系统
private String requestSystem;
// 省略其他属性和setter/getter方法
}
public class Serivce() {
public String doService(Bean bean) {
// 每个请求的外部系统一个setter实例
Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
HelloCommand command = new HelloCommand(setter);
return command.execute();
}
public Setter getSetterFromRequestSystem(String requestSystem) {
// 将requestSystem设置为组名,或者threadPoolKey
return Setter.withGroupKey(requestSystem)...;
}
}
每个请求系统会有自己的com.netflix.hystrix.HystrixCommand.Setter
实例,通过不同实例的groupKey、commandKey、threadPoolKey等属性的设置,很容易实现线程隔离。
需要说明的是,fallback的统计数据是通过commandKey来区分的,所以最好每个线程池也有自己的commandKey。
private static HystrixCommandMetrics initMetrics(HystrixCommandMetrics fromConstructor, HystrixCommandGroupKey groupKey,
HystrixThreadPoolKey threadPoolKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties) {
if (fromConstructor == null) {
// 以commandKey来创建一个HystrixCommandMetrics实例
return HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
} else {
return fromConstructor;
}
}
可配置化(可略过)
对于getSetterFromRequestSystem
你可能希望实例化里面的参数是从配置中获取的,这样可以方便的调整配置参数。
先看看Setter
长什么样:
![](https://img.haomeiwen.com/i1716507/90fa458a18eb65e5.png)
比如你的配置是这样的:
requestSystemA = {
"groupKey": "requestSystemA",
"commandKey": "requestSystemA",
"threadPoolKey": "requestSystemA",
"commandPropertiesDefaults": {
"executionTimeoutInMilliseconds": 2000,
"executionTimeoutEnabled": true
},
"threadPoolPropertiesDefaults": {
"coreSize": 10,
"maxQueueSize": 100,
"queueSizeRejectionThreshold": 50
}
}
requestSystemB = {
"groupKey": "requestSystemB",
"commandKey": "requestSystemB",
"threadPoolKey": "requestSystemB",
"commandPropertiesDefaults": {
"executionTimeoutInMilliseconds": 3000,
"executionTimeoutEnabled": true
},
"threadPoolPropertiesDefaults": {
"coreSize": 20,
"maxQueueSize": 200,
"queueSizeRejectionThreshold": 50
}
}
requestSystemC = {
"groupKey": "requestSystemC",
"commandKey": "requestSystemC",
"threadPoolKey": "requestSystemC",
"commandPropertiesDefaults": {
"executionTimeoutInMilliseconds": 2000,
"executionTimeoutEnabled": true
},
"threadPoolPropertiesDefaults": {
"coreSize": 5,
"maxQueueSize": 5,
"queueSizeRejectionThreshold": 3
}
}
这时候你的代码可能如下:
public class Bean {
// 需要请求的系统
private String requestSystem;
// 省略其他属性和setter/getter方法
}
public class Serivce() {
public String doService(Bean bean) {
// 每个请求的外部系统一个setter实例
Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
HelloCommand command = new HelloCommand(setter);
return command.execute();
}
public Setter getSetterFromRequestSystem(String requestSystem) {
// 通过requestSystem获取到配置并实例化成Setter
return getSetterFromConfig(getConfig(requestSystem));
}
public Setter getSetterFromConfig(String json) {
return jsonToObject(json, Setter.class);
}
}
看到这里,问题就变成了如何把配置信息变成Setter实例的问题了。
总结
本文讲解了两种实现线程隔离的方式,当然@HystrixCommand
注解也能实现线程隔离,可以保证每个方法是一个独立的熔断器。
扩展
HystrixCommand
逻辑时序:
com.netflix.hystrix.HystrixCommand#execute
-> com.netflix.hystrix.HystrixCommand#queue
-> com.netflix.hystrix.AbstractCommand#toObservable
-> com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
circuitBreaker.attemptExecution()
executionSemaphore.tryAcquire()
-> com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
-> com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
-> com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
-> com.netflix.hystrix.HystrixCommand#getExecutionObservable
Observable.just(run())
HystrixCommand
线程池逻辑时序:
com.netflix.hystrix.HystrixCommand#execute
-> com.netflix.hystrix.HystrixCommand#queue
toFuture()
-> rx.internal.operators.OperatorSubscribeOn#call
-> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextSchedulerWorker#schedule(rx.functions.Action0)
-> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.ThreadPoolWorker#schedule(rx.functions.Action0)
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();// HystrixThreadPool
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
@HystrixCommand
注解原理
直接看com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect
类即可。com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand
方法拦截@HystrixCommand
和@HystrixCollapser
注解。
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
// 通过jointPoint获取@HystrixCommand注解的信息,创建MetaHolder实例
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 通过MetaHolder实例创建HystrixInvokable(HystrixCommand的父接口)
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
// 调用execute()方法,跟上面分析HystrixCommand#execute方法基本一致
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
那么@HystrixCommand
的commandKey是什么呢?
答案是方法名。
// com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect.CommandMetaHolderFactory#create
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
if (isCompileWeaving()) {
builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
}
return builder.defaultCommandKey(method.getName())// commandKey设置为方法名
.hystrixCommand(hystrixCommand)
.observableExecutionMode(hystrixCommand.observableExecutionMode())
.executionType(executionType)
.observable(ExecutionType.OBSERVABLE == executionType)
.build();
}
当然你也可以通过@HystrixCommand(commandKey = "xxx")
设置。
参考
【SpringCloud】HystrixCommand的threadPoolKey默认值及线程池初始化
从源码看hystrix的工作原理
Hystrix源码解析
Hystrix问题记录
Spring Cloud系列--Hystrix源码
Spring Cloud 源码学习之 Hystrix 工作原理
How-it-Works
Hystrix执行过程源码分析
网友评论