美文网首页微服务架构和实践
HystrixCommand实现线程隔离

HystrixCommand实现线程隔离

作者: 李不言被占用了 | 来源:发表于2019-06-27 17:42 被阅读236次

    Hystrix的基本使用

    public class HelloCommand extends HystrixCommand<String> {
    
        public HelloCommand(Setter setter) {
            super(setter);
        }
    
        @Override
        protected String run() throws Exception {
    
            return "success";
        }
    
        @Override
        protected String getFallback() {
            
            return "fail";
        }
    }
    
    public class HelloCommandTest() {
    
        @Test
        public void test() {
            String THREAD_POOL_PREFIX = "thread-";
            String groupKey = "groud-100";
            int timeoutMilliseconds = 2000;
            
            Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(groupKey))
                    .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(THREAD_POOL_PREFIX + groupKey))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeoutMilliseconds))
                    .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10)./*withMaxQueueSize(90).*/withQueueSizeRejectionThreshold(1000)
            
            // 每次调用execute方法都需要创建一个新的HelloCommand实例,因为每个实例都有自己的状态
            /*if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }*/
            HelloCommand command = new HelloCommand(setter);
            String result = command.execute();
        }
    }
    

    线程隔离原理

    存在全局的线程池线程池缓存,根据threadPoolKey来区分不同的线程池,每个threadPoolKey一个线程池。

    final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<>();
    
    // com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
    /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
        // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
        String key = threadPoolKey.name();
    
        // this should find it for all but the first time
        HystrixThreadPool previouslyCached = threadPools.get(key);
        if (previouslyCached != null) {
            return previouslyCached;
        }
    
        // if we get here this is the first time so we need to initialize
        synchronized (HystrixThreadPool.class) {
            if (!threadPools.containsKey(key)) {
                threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
            }
        }
        return threadPools.get(key);
    }
    

    所以要实现线程隔离的关键在于threadPoolKey,一个threadPoolKey映射一个线程池。

    这里探讨两种线程隔离的方式:类级别实例级别,重点讲讲实例级别的线程隔离。

    类级别线程隔离

    最简单的方法是实现很多的HystrixCommand<T>,如上面代码里的HelloCommand,你还可以有ACommandBCommandCCommandDCommand……这样每个类都会有自己的线程池,从而实现线程隔离。
    原理(可略过,不影响全文阅读)
    每个类在实例化的时候,如果没有指定线程池(通过Setter指定),默认会使用组名作为一组线程池,组名必须设置,因为构造Setter实例的唯一方法是withGroupKey

    // com.netflix.hystrix.HystrixCommand.Setter#withGroupKey
    public static Setter withGroupKey(HystrixCommandGroupKey groupKey) {
        return new Setter(groupKey);
    }
    
    // com.netflix.hystrix.AbstractCommand#initThreadPoolKey
    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
        if (threadPoolKeyOverride == null) {
            // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
            if (threadPoolKey == null) {
                /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
                // 没有设置threadPoolKey使用groupKey
                return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
            } else {
                return threadPoolKey;
            }
        } else {
            // we have a property defining the thread-pool so use it instead
            return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
        }
    }
    

    实例级别线程隔离

    但其实很多时候你并不希望有这么多的类,你可能希望在HelloCommand#run中根据不同的请求系统进行线程隔离,比如:HelloCommand#run根据不同的请求参数,会访问系统A、系统B、系统C、系统D……你不希望系统A出现问题影响到代码对其他系统的访问。
    这个时候你的代码可能是这样的:

    public class Bean {
        // 需要请求的系统
        private String requestSystem;
        
        // 省略其他属性和setter/getter方法
    }
    
    public class Serivce() {
        public String doService(Bean bean) {
            // 每个请求的外部系统一个setter实例
            Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
            HelloCommand command = new HelloCommand(setter);
            return command.execute();
        }
        
        public Setter getSetterFromRequestSystem(String requestSystem) {
            // 将requestSystem设置为组名,或者threadPoolKey
            return Setter.withGroupKey(requestSystem)...;
        }
    }
    

    每个请求系统会有自己的com.netflix.hystrix.HystrixCommand.Setter实例,通过不同实例的groupKey、commandKey、threadPoolKey等属性的设置,很容易实现线程隔离。

    需要说明的是,fallback的统计数据是通过commandKey来区分的,所以最好每个线程池也有自己的commandKey。

    private static HystrixCommandMetrics initMetrics(HystrixCommandMetrics fromConstructor, HystrixCommandGroupKey groupKey,
                                                     HystrixThreadPoolKey threadPoolKey, HystrixCommandKey commandKey,
                                                     HystrixCommandProperties properties) {
        if (fromConstructor == null) {
            // 以commandKey来创建一个HystrixCommandMetrics实例
            return HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
        } else {
            return fromConstructor;
        }
    }
    
    

    可配置化(可略过)

    对于getSetterFromRequestSystem你可能希望实例化里面的参数是从配置中获取的,这样可以方便的调整配置参数。
    先看看Setter长什么样:

    image.png

    比如你的配置是这样的:

    requestSystemA = {
        "groupKey": "requestSystemA", 
        "commandKey": "requestSystemA", 
        "threadPoolKey": "requestSystemA", 
        "commandPropertiesDefaults": {
            "executionTimeoutInMilliseconds": 2000, 
            "executionTimeoutEnabled": true
        }, 
        "threadPoolPropertiesDefaults": {
            "coreSize": 10, 
            "maxQueueSize": 100, 
            "queueSizeRejectionThreshold": 50
        }
    }
    
    requestSystemB = {
        "groupKey": "requestSystemB", 
        "commandKey": "requestSystemB", 
        "threadPoolKey": "requestSystemB", 
        "commandPropertiesDefaults": {
            "executionTimeoutInMilliseconds": 3000, 
            "executionTimeoutEnabled": true
        }, 
        "threadPoolPropertiesDefaults": {
            "coreSize": 20, 
            "maxQueueSize": 200, 
            "queueSizeRejectionThreshold": 50
        }
    }
    
    requestSystemC = {
        "groupKey": "requestSystemC", 
        "commandKey": "requestSystemC", 
        "threadPoolKey": "requestSystemC", 
        "commandPropertiesDefaults": {
            "executionTimeoutInMilliseconds": 2000, 
            "executionTimeoutEnabled": true
        }, 
        "threadPoolPropertiesDefaults": {
            "coreSize": 5, 
            "maxQueueSize": 5, 
            "queueSizeRejectionThreshold": 3
        }
    }
    

    这时候你的代码可能如下:

    public class Bean {
        // 需要请求的系统
        private String requestSystem;
        
        // 省略其他属性和setter/getter方法
    }
    
    public class Serivce() {
        public String doService(Bean bean) {
            // 每个请求的外部系统一个setter实例
            Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
            HelloCommand command = new HelloCommand(setter);
            return command.execute();
        }
        
        public Setter getSetterFromRequestSystem(String requestSystem) {
            // 通过requestSystem获取到配置并实例化成Setter
            return getSetterFromConfig(getConfig(requestSystem));
        }
        
        public Setter getSetterFromConfig(String json) {
            return jsonToObject(json, Setter.class);
        }
    }
    

    看到这里,问题就变成了如何把配置信息变成Setter实例的问题了。

    总结

    本文讲解了两种实现线程隔离的方式,当然@HystrixCommand注解也能实现线程隔离,可以保证每个方法是一个独立的熔断器。

    扩展

    HystrixCommand逻辑时序:

    com.netflix.hystrix.HystrixCommand#execute
    -> com.netflix.hystrix.HystrixCommand#queue
    -> com.netflix.hystrix.AbstractCommand#toObservable
    -> com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
        circuitBreaker.attemptExecution()
        executionSemaphore.tryAcquire()
    -> com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
    -> com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
    -> com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
    -> com.netflix.hystrix.HystrixCommand#getExecutionObservable
        Observable.just(run())
    
    

    HystrixCommand线程池逻辑时序:

    com.netflix.hystrix.HystrixCommand#execute
     -> com.netflix.hystrix.HystrixCommand#queue
        toFuture()
    -> rx.internal.operators.OperatorSubscribeOn#call
    -> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextSchedulerWorker#schedule(rx.functions.Action0)
    -> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.ThreadPoolWorker#schedule(rx.functions.Action0)
        ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();// HystrixThreadPool
        FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    

    @HystrixCommand注解原理

    直接看com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect类即可。com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand方法拦截@HystrixCommand@HystrixCollapser注解。

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
    public void hystrixCommandAnnotationPointcut() {
    }
    
    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }
    
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        // 通过jointPoint获取@HystrixCommand注解的信息,创建MetaHolder实例
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        // 通过MetaHolder实例创建HystrixInvokable(HystrixCommand的父接口)
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
    
        Object result;
        try {
            if (!metaHolder.isObservable()) {
                // 调用execute()方法,跟上面分析HystrixCommand#execute方法基本一致
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause() != null ? e.getCause() : e;
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    

    那么@HystrixCommand的commandKey是什么呢?
    答案是方法名。

    // com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect.CommandMetaHolderFactory#create
    public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
        HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
        ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
        MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
        if (isCompileWeaving()) {
            builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
        }
        return builder.defaultCommandKey(method.getName())// commandKey设置为方法名
                        .hystrixCommand(hystrixCommand)
                        .observableExecutionMode(hystrixCommand.observableExecutionMode())
                        .executionType(executionType)
                        .observable(ExecutionType.OBSERVABLE == executionType)
                        .build();
    }
    

    当然你也可以通过@HystrixCommand(commandKey = "xxx")设置。

    参考
    【SpringCloud】HystrixCommand的threadPoolKey默认值及线程池初始化
    从源码看hystrix的工作原理
    Hystrix源码解析
    Hystrix问题记录
    Spring Cloud系列--Hystrix源码
    Spring Cloud 源码学习之 Hystrix 工作原理
    How-it-Works
    Hystrix执行过程源码分析

    相关文章

      网友评论

        本文标题:HystrixCommand实现线程隔离

        本文链接:https://www.haomeiwen.com/subject/ksmzfctx.html