美文网首页Spring Cloudspring cloud
Spring Cloud Hystrix 分析(三)之Hystr

Spring Cloud Hystrix 分析(三)之Hystr

作者: Blog | 来源:发表于2021-02-17 17:10 被阅读0次

    分析(二)我们总结到HystrixAutoConfiguration主要是健康状态指标的配置类,本节则开始分析Hystrix的核心配置类HystrixCircuitBreakerConfiguration,其中就包含断路器实现、断路器信息定时汇总等等


    HystrixCircuitBreakerConfiguration

    @Configuration
    public class HystrixCircuitBreakerConfiguration {
        //@HystrixCommand、@HystrixCollapser注解的实现
        @Bean
        public HystrixCommandAspect hystrixCommandAspect() {
            return new HystrixCommandAspect();
        }
        //清除Hystrix各种状态,如线程池
        @Bean
        public HystrixShutdownHook hystrixShutdownHook() {
            return new HystrixShutdownHook();
        }
        //注册Hystrix特征给featuresEndpoint
        //通过/actuator/features获取系统当前注册进来的features
        @Bean
        public HasFeatures hystrixFeature() {
            return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
        }
        //Hystrix的Servlet/Web配置
        @Configuration
        @ConditionalOnProperty(value = "hystrix.stream.endpoint.enabled", matchIfMissing = true)
        @ConditionalOnWebApplication
        @ConditionalOnClass({ Endpoint.class, HystrixMetricsStreamServlet.class })
        protected static class HystrixWebConfiguration {
    
            @Bean
            public HystrixStreamEndpoint hystrixStreamEndpoint() {
                return new HystrixStreamEndpoint();
            }
    
            @Bean
            public HasFeatures hystrixStreamFeature() {
                return HasFeatures.namedFeature("Hystrix Stream Servlet", HystrixStreamEndpoint.class);
            }
        }
        //定期处理(2s)Hystrix上报的数据,判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断
        @Configuration
        @ConditionalOnProperty(value = "hystrix.metrics.enabled", matchIfMissing = true)
        @ConditionalOnClass({ HystrixMetricsPoller.class, GaugeService.class })
        @EnableConfigurationProperties(HystrixMetricsProperties.class)
        protected static class HystrixMetricsPollerConfiguration implements SmartLifecycle {......}
    }
    

    以上我们重点关注HystrixCommandAspect与HystrixMetricsPollerConfiguration即可

    GenericCommand负责执行具体方法与Fallback方法

    HystrixCommandAspect

    @Aspect
    public class HystrixCommandAspect {
        //命令注解,本节重点分析
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
        public void hystrixCommandAnnotationPointcut() {}
        //请求合并注解,本节暂时不做分析,因为用的比较少,后续会针对进行分析
        @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
        public void hystrixCollapserAnnotationPointcut() {}
        //环绕切点
        @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
        public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
            Method method = getMethodFromTarget(joinPoint);
            Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
            if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
                throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                        "annotations at the same time");
            }
            //根据method方法对应的类型获取元数据工厂处理类,我们主要针对@HystrixCommand,所以得到CommandMetaHolderFactory
            MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
            //执行CommandMetaHolderFactory.create创建MetaHolder
            //设置方法名为默认的commandKey,设置fallback方法,设置groupKey(默认值为@HystrixCommand的类名)
            //如果存在@DefaultProperties注解则使用当前注解里面的参数设置groupKey、threadPoolKey
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            //顶层接口,用于统一接口,标记可以执行
            //创建GenericCommand,并通过HystrixCommandBuilderFactory#createGenericSetterBuilder设置groupKey、commandKey、threadPoolKey
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            //@HystrixCommand注解对应SYNCHRONOUS
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                    metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
            //执行命令GenericCommand
            Object result;
            try {
                if (!metaHolder.isObservable()) {
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = executeObservable(invokable, executionType, metaHolder);
                }
            } catch (HystrixBadRequestException e) {
                throw e.getCause();
            } catch (HystrixRuntimeException e) {
                throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
            }
            return result;
        }
        ......
    }
    

    通过以上代码片段与部分注释信息,我们大致可以得知,Hystrix通过Aop对方法进行解析,然后封装元数据,设置默认参数,如commandKey、groupKey、threadPoolKey,最终封装具有熔断、降级功能的命令执行器GenericCommand,那么下面我们就开始分析GenericCommand,一步一步揭晓GenericCommand的工作原理


    CommandExecutor

    public class CommandExecutor {
        public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
            ......
            switch (executionType) {
                case SYNCHRONOUS: {
                    return castToExecutable(invokable, executionType).execute();
                }
                case ASYNCHRONOUS: {......}
                case OBSERVABLE: {......}
                default:
                    throw new RuntimeException("unsupported execution type: " + executionType);
            }
        }
    
        private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
            if (invokable instanceof HystrixExecutable) {
                return (HystrixExecutable) invokable;
            }
            throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
        }
        ......
    }
    

    通过CommandExecutor.execute调用,会调用到GenericCommand.execute(),最终调用HystrixCommand.execute()方法


    HystrixCommand

    public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
        ......
        public R execute() {
            try {
                return queue().get();
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
        }
        public Future<R> queue() {
            //AbstractCommand. toObservable()
            final Future<R> delegate = toObservable().toBlocking().toFuture();
            ......
        }
    }
    

    AbstractCommand

    abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
        ......
        public Observable<R> toObservable() {
            final AbstractCommand<R> _cmd = this;
            //命令执行结束后的清理逻辑
            final Action0 terminateCommandCleanup = new Action0() {......};
            //取消订阅后的清理逻辑
            final Action0 unsubscribeCommandCleanup = new Action0() {......};
            //Hystrix断路器、隔离相关逻辑
            final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {......};
            //发射数据时的逻辑,扩展接口,扩展给外部使用,内部无其他逻辑,直接返回的结果
            final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {......};
            //命令执行完成后的逻辑,也是扩展给外部使用
            final Action0 fireOnCompletedHook = new Action0() {......};
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    //主要是标记当前命令状态,命令只能使用一次,类似状态机
                    if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                        IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                    }
                    commandStartTimestamp = System.currentTimeMillis();
                    if (properties.requestLogEnabled().get()) {
                        if (currentRequestLog != null) {
                            currentRequestLog.addExecutedCommand(_cmd);
                        }
                    }
                    //缓存标示,缓存开关打开并且存在缓存key
                    final boolean requestCacheEnabled = isRequestCachingEnabled();
                    //缓存key,结合@CacheResult注解使用
                    final String cacheKey = getCacheKey();
    
                    //首先从缓存中获取是否存在相同命令的结果
                    if (requestCacheEnabled) {
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
                    //创建具有断路器、隔离逻辑的Observable
                    Observable<R> hystrixObservable =
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);
                    Observable<R> afterCache;
    
                    //如果满足缓存条件
                    if (requestCacheEnabled && cacheKey != null) {
                        // 从缓存中包装Observable
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        if (fromCache != null) {
                            // 如果其他线程已经存入这个缓存命令,那么直接执行
                            toCache.unsubscribe();
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        } else {
                            // 返回刚刚我们创建的ObservableCommand
                            afterCache = toCache.toObservable();
                        }
                    } else {
                        afterCache = hystrixObservable;
                    }
                    return afterCache
                            //命令执行结束后的清理逻辑
                            .doOnTerminate(terminateCommandCleanup)     
                            //取消订阅后的清理动作
                            .doOnUnsubscribe(unsubscribeCommandCleanup) 
                            //命令执行完成后的逻辑,主要也是扩展给外部使用
                            .doOnCompleted(fireOnCompletedHook);
                }
            });
        }
    }
    

    在上面注释信息中,我们可以知道最核心的功能莫过于applyHystrixSemantics这个处理逻辑,内部包含实现线程池隔离、信号量隔离,以及断路器健康状况信息上报,由于逻辑比较多,所以我们放到下一节进行分析与总结!

    相关文章

      网友评论

        本文标题:Spring Cloud Hystrix 分析(三)之Hystr

        本文链接:https://www.haomeiwen.com/subject/ikxlxltx.html