美文网首页Spring Cloudspring cloud
Spring Cloud Hystrix 分析(三)之Hystr

Spring Cloud Hystrix 分析(三)之Hystr

作者: Blog | 来源:发表于2021-02-17 17:10 被阅读0次

分析(二)我们总结到HystrixAutoConfiguration主要是健康状态指标的配置类,本节则开始分析Hystrix的核心配置类HystrixCircuitBreakerConfiguration,其中就包含断路器实现、断路器信息定时汇总等等


HystrixCircuitBreakerConfiguration

@Configuration
public class HystrixCircuitBreakerConfiguration {
    //@HystrixCommand、@HystrixCollapser注解的实现
    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }
    //清除Hystrix各种状态,如线程池
    @Bean
    public HystrixShutdownHook hystrixShutdownHook() {
        return new HystrixShutdownHook();
    }
    //注册Hystrix特征给featuresEndpoint
    //通过/actuator/features获取系统当前注册进来的features
    @Bean
    public HasFeatures hystrixFeature() {
        return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
    }
    //Hystrix的Servlet/Web配置
    @Configuration
    @ConditionalOnProperty(value = "hystrix.stream.endpoint.enabled", matchIfMissing = true)
    @ConditionalOnWebApplication
    @ConditionalOnClass({ Endpoint.class, HystrixMetricsStreamServlet.class })
    protected static class HystrixWebConfiguration {

        @Bean
        public HystrixStreamEndpoint hystrixStreamEndpoint() {
            return new HystrixStreamEndpoint();
        }

        @Bean
        public HasFeatures hystrixStreamFeature() {
            return HasFeatures.namedFeature("Hystrix Stream Servlet", HystrixStreamEndpoint.class);
        }
    }
    //定期处理(2s)Hystrix上报的数据,判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断
    @Configuration
    @ConditionalOnProperty(value = "hystrix.metrics.enabled", matchIfMissing = true)
    @ConditionalOnClass({ HystrixMetricsPoller.class, GaugeService.class })
    @EnableConfigurationProperties(HystrixMetricsProperties.class)
    protected static class HystrixMetricsPollerConfiguration implements SmartLifecycle {......}
}

以上我们重点关注HystrixCommandAspect与HystrixMetricsPollerConfiguration即可

GenericCommand负责执行具体方法与Fallback方法

HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {
    //命令注解,本节重点分析
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {}
    //请求合并注解,本节暂时不做分析,因为用的比较少,后续会针对进行分析
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {}
    //环绕切点
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        //根据method方法对应的类型获取元数据工厂处理类,我们主要针对@HystrixCommand,所以得到CommandMetaHolderFactory
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //执行CommandMetaHolderFactory.create创建MetaHolder
        //设置方法名为默认的commandKey,设置fallback方法,设置groupKey(默认值为@HystrixCommand的类名)
        //如果存在@DefaultProperties注解则使用当前注解里面的参数设置groupKey、threadPoolKey
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        //顶层接口,用于统一接口,标记可以执行
        //创建GenericCommand,并通过HystrixCommandBuilderFactory#createGenericSetterBuilder设置groupKey、commandKey、threadPoolKey
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        //@HystrixCommand注解对应SYNCHRONOUS
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
        //执行命令GenericCommand
        Object result;
        try {
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    ......
}

通过以上代码片段与部分注释信息,我们大致可以得知,Hystrix通过Aop对方法进行解析,然后封装元数据,设置默认参数,如commandKey、groupKey、threadPoolKey,最终封装具有熔断、降级功能的命令执行器GenericCommand,那么下面我们就开始分析GenericCommand,一步一步揭晓GenericCommand的工作原理


CommandExecutor

public class CommandExecutor {
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        ......
        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {......}
            case OBSERVABLE: {......}
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

    private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
        if (invokable instanceof HystrixExecutable) {
            return (HystrixExecutable) invokable;
        }
        throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
    }
    ......
}

通过CommandExecutor.execute调用,会调用到GenericCommand.execute(),最终调用HystrixCommand.execute()方法


HystrixCommand

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    public Future<R> queue() {
        //AbstractCommand. toObservable()
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        ......
    }
}

AbstractCommand

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;
        //命令执行结束后的清理逻辑
        final Action0 terminateCommandCleanup = new Action0() {......};
        //取消订阅后的清理逻辑
        final Action0 unsubscribeCommandCleanup = new Action0() {......};
        //Hystrix断路器、隔离相关逻辑
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {......};
        //发射数据时的逻辑,扩展接口,扩展给外部使用,内部无其他逻辑,直接返回的结果
        final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {......};
        //命令执行完成后的逻辑,也是扩展给外部使用
        final Action0 fireOnCompletedHook = new Action0() {......};
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                //主要是标记当前命令状态,命令只能使用一次,类似状态机
                if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                    IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                    throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                }
                commandStartTimestamp = System.currentTimeMillis();
                if (properties.requestLogEnabled().get()) {
                    if (currentRequestLog != null) {
                        currentRequestLog.addExecutedCommand(_cmd);
                    }
                }
                //缓存标示,缓存开关打开并且存在缓存key
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                //缓存key,结合@CacheResult注解使用
                final String cacheKey = getCacheKey();

                //首先从缓存中获取是否存在相同命令的结果
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                //创建具有断路器、隔离逻辑的Observable
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                Observable<R> afterCache;

                //如果满足缓存条件
                if (requestCacheEnabled && cacheKey != null) {
                    // 从缓存中包装Observable
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // 如果其他线程已经存入这个缓存命令,那么直接执行
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // 返回刚刚我们创建的ObservableCommand
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }
                return afterCache
                        //命令执行结束后的清理逻辑
                        .doOnTerminate(terminateCommandCleanup)     
                        //取消订阅后的清理动作
                        .doOnUnsubscribe(unsubscribeCommandCleanup) 
                        //命令执行完成后的逻辑,主要也是扩展给外部使用
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
}

在上面注释信息中,我们可以知道最核心的功能莫过于applyHystrixSemantics这个处理逻辑,内部包含实现线程池隔离、信号量隔离,以及断路器健康状况信息上报,由于逻辑比较多,所以我们放到下一节进行分析与总结!

相关文章

网友评论

    本文标题:Spring Cloud Hystrix 分析(三)之Hystr

    本文链接:https://www.haomeiwen.com/subject/ikxlxltx.html