分析(二)我们总结到HystrixAutoConfiguration主要是健康状态指标的配置类,本节则开始分析Hystrix的核心配置类HystrixCircuitBreakerConfiguration,其中就包含断路器实现、断路器信息定时汇总等等
HystrixCircuitBreakerConfiguration
@Configuration
public class HystrixCircuitBreakerConfiguration {
//@HystrixCommand、@HystrixCollapser注解的实现
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
//清除Hystrix各种状态,如线程池
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
//注册Hystrix特征给featuresEndpoint
//通过/actuator/features获取系统当前注册进来的features
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
//Hystrix的Servlet/Web配置
@Configuration
@ConditionalOnProperty(value = "hystrix.stream.endpoint.enabled", matchIfMissing = true)
@ConditionalOnWebApplication
@ConditionalOnClass({ Endpoint.class, HystrixMetricsStreamServlet.class })
protected static class HystrixWebConfiguration {
@Bean
public HystrixStreamEndpoint hystrixStreamEndpoint() {
return new HystrixStreamEndpoint();
}
@Bean
public HasFeatures hystrixStreamFeature() {
return HasFeatures.namedFeature("Hystrix Stream Servlet", HystrixStreamEndpoint.class);
}
}
//定期处理(2s)Hystrix上报的数据,判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断
@Configuration
@ConditionalOnProperty(value = "hystrix.metrics.enabled", matchIfMissing = true)
@ConditionalOnClass({ HystrixMetricsPoller.class, GaugeService.class })
@EnableConfigurationProperties(HystrixMetricsProperties.class)
protected static class HystrixMetricsPollerConfiguration implements SmartLifecycle {......}
}
以上我们重点关注HystrixCommandAspect与HystrixMetricsPollerConfiguration即可
GenericCommand负责执行具体方法与Fallback方法HystrixCommandAspect
@Aspect
public class HystrixCommandAspect {
//命令注解,本节重点分析
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {}
//请求合并注解,本节暂时不做分析,因为用的比较少,后续会针对进行分析
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {}
//环绕切点
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
//根据method方法对应的类型获取元数据工厂处理类,我们主要针对@HystrixCommand,所以得到CommandMetaHolderFactory
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//执行CommandMetaHolderFactory.create创建MetaHolder
//设置方法名为默认的commandKey,设置fallback方法,设置groupKey(默认值为@HystrixCommand的类名)
//如果存在@DefaultProperties注解则使用当前注解里面的参数设置groupKey、threadPoolKey
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//顶层接口,用于统一接口,标记可以执行
//创建GenericCommand,并通过HystrixCommandBuilderFactory#createGenericSetterBuilder设置groupKey、commandKey、threadPoolKey
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
//@HystrixCommand注解对应SYNCHRONOUS
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
//执行命令GenericCommand
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
......
}
通过以上代码片段与部分注释信息,我们大致可以得知,Hystrix通过Aop对方法进行解析,然后封装元数据,设置默认参数,如commandKey、groupKey、threadPoolKey,最终封装具有熔断、降级功能的命令执行器GenericCommand,那么下面我们就开始分析GenericCommand,一步一步揭晓GenericCommand的工作原理
CommandExecutor
public class CommandExecutor {
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
......
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {......}
case OBSERVABLE: {......}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
......
}
通过CommandExecutor.execute调用,会调用到GenericCommand.execute(),最终调用HystrixCommand.execute()方法
HystrixCommand
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
......
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
//AbstractCommand. toObservable()
final Future<R> delegate = toObservable().toBlocking().toFuture();
......
}
}
AbstractCommand
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
......
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//命令执行结束后的清理逻辑
final Action0 terminateCommandCleanup = new Action0() {......};
//取消订阅后的清理逻辑
final Action0 unsubscribeCommandCleanup = new Action0() {......};
//Hystrix断路器、隔离相关逻辑
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {......};
//发射数据时的逻辑,扩展接口,扩展给外部使用,内部无其他逻辑,直接返回的结果
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {......};
//命令执行完成后的逻辑,也是扩展给外部使用
final Action0 fireOnCompletedHook = new Action0() {......};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//主要是标记当前命令状态,命令只能使用一次,类似状态机
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
//缓存标示,缓存开关打开并且存在缓存key
final boolean requestCacheEnabled = isRequestCachingEnabled();
//缓存key,结合@CacheResult注解使用
final String cacheKey = getCacheKey();
//首先从缓存中获取是否存在相同命令的结果
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//创建具有断路器、隔离逻辑的Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
//如果满足缓存条件
if (requestCacheEnabled && cacheKey != null) {
// 从缓存中包装Observable
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// 如果其他线程已经存入这个缓存命令,那么直接执行
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// 返回刚刚我们创建的ObservableCommand
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
//命令执行结束后的清理逻辑
.doOnTerminate(terminateCommandCleanup)
//取消订阅后的清理动作
.doOnUnsubscribe(unsubscribeCommandCleanup)
//命令执行完成后的逻辑,主要也是扩展给外部使用
.doOnCompleted(fireOnCompletedHook);
}
});
}
}
在上面注释信息中,我们可以知道最核心的功能莫过于applyHystrixSemantics这个处理逻辑,内部包含实现线程池隔离、信号量隔离,以及断路器健康状况信息上报,由于逻辑比较多,所以我们放到下一节进行分析与总结!
网友评论