美文网首页微服务架构和实践
HystrixCommand实现线程隔离

HystrixCommand实现线程隔离

作者: 李不言被占用了 | 来源:发表于2019-06-27 17:42 被阅读236次

Hystrix的基本使用

public class HelloCommand extends HystrixCommand<String> {

    public HelloCommand(Setter setter) {
        super(setter);
    }

    @Override
    protected String run() throws Exception {

        return "success";
    }

    @Override
    protected String getFallback() {
        
        return "fail";
    }
}

public class HelloCommandTest() {

    @Test
    public void test() {
        String THREAD_POOL_PREFIX = "thread-";
        String groupKey = "groud-100";
        int timeoutMilliseconds = 2000;
        
        Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
                .andCommandKey(HystrixCommandKey.Factory.asKey(groupKey))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(THREAD_POOL_PREFIX + groupKey))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeoutMilliseconds))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10)./*withMaxQueueSize(90).*/withQueueSizeRejectionThreshold(1000)
        
        // 每次调用execute方法都需要创建一个新的HelloCommand实例,因为每个实例都有自己的状态
        /*if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
            IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
            //TODO make a new error type for this
            throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
        }*/
        HelloCommand command = new HelloCommand(setter);
        String result = command.execute();
    }
}

线程隔离原理

存在全局的线程池线程池缓存,根据threadPoolKey来区分不同的线程池,每个threadPoolKey一个线程池。

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<>();

// com.netflix.hystrix.HystrixThreadPool.Factory#getInstance
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

所以要实现线程隔离的关键在于threadPoolKey,一个threadPoolKey映射一个线程池。

这里探讨两种线程隔离的方式:类级别实例级别,重点讲讲实例级别的线程隔离。

类级别线程隔离

最简单的方法是实现很多的HystrixCommand<T>,如上面代码里的HelloCommand,你还可以有ACommandBCommandCCommandDCommand……这样每个类都会有自己的线程池,从而实现线程隔离。
原理(可略过,不影响全文阅读)
每个类在实例化的时候,如果没有指定线程池(通过Setter指定),默认会使用组名作为一组线程池,组名必须设置,因为构造Setter实例的唯一方法是withGroupKey

// com.netflix.hystrix.HystrixCommand.Setter#withGroupKey
public static Setter withGroupKey(HystrixCommandGroupKey groupKey) {
    return new Setter(groupKey);
}

// com.netflix.hystrix.AbstractCommand#initThreadPoolKey
private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {
    if (threadPoolKeyOverride == null) {
        // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
        if (threadPoolKey == null) {
            /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
            // 没有设置threadPoolKey使用groupKey
            return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
        } else {
            return threadPoolKey;
        }
    } else {
        // we have a property defining the thread-pool so use it instead
        return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
    }
}

实例级别线程隔离

但其实很多时候你并不希望有这么多的类,你可能希望在HelloCommand#run中根据不同的请求系统进行线程隔离,比如:HelloCommand#run根据不同的请求参数,会访问系统A、系统B、系统C、系统D……你不希望系统A出现问题影响到代码对其他系统的访问。
这个时候你的代码可能是这样的:

public class Bean {
    // 需要请求的系统
    private String requestSystem;
    
    // 省略其他属性和setter/getter方法
}

public class Serivce() {
    public String doService(Bean bean) {
        // 每个请求的外部系统一个setter实例
        Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
        HelloCommand command = new HelloCommand(setter);
        return command.execute();
    }
    
    public Setter getSetterFromRequestSystem(String requestSystem) {
        // 将requestSystem设置为组名,或者threadPoolKey
        return Setter.withGroupKey(requestSystem)...;
    }
}

每个请求系统会有自己的com.netflix.hystrix.HystrixCommand.Setter实例,通过不同实例的groupKey、commandKey、threadPoolKey等属性的设置,很容易实现线程隔离。

需要说明的是,fallback的统计数据是通过commandKey来区分的,所以最好每个线程池也有自己的commandKey。

private static HystrixCommandMetrics initMetrics(HystrixCommandMetrics fromConstructor, HystrixCommandGroupKey groupKey,
                                                 HystrixThreadPoolKey threadPoolKey, HystrixCommandKey commandKey,
                                                 HystrixCommandProperties properties) {
    if (fromConstructor == null) {
        // 以commandKey来创建一个HystrixCommandMetrics实例
        return HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
    } else {
        return fromConstructor;
    }
}

可配置化(可略过)

对于getSetterFromRequestSystem你可能希望实例化里面的参数是从配置中获取的,这样可以方便的调整配置参数。
先看看Setter长什么样:

image.png

比如你的配置是这样的:

requestSystemA = {
    "groupKey": "requestSystemA", 
    "commandKey": "requestSystemA", 
    "threadPoolKey": "requestSystemA", 
    "commandPropertiesDefaults": {
        "executionTimeoutInMilliseconds": 2000, 
        "executionTimeoutEnabled": true
    }, 
    "threadPoolPropertiesDefaults": {
        "coreSize": 10, 
        "maxQueueSize": 100, 
        "queueSizeRejectionThreshold": 50
    }
}

requestSystemB = {
    "groupKey": "requestSystemB", 
    "commandKey": "requestSystemB", 
    "threadPoolKey": "requestSystemB", 
    "commandPropertiesDefaults": {
        "executionTimeoutInMilliseconds": 3000, 
        "executionTimeoutEnabled": true
    }, 
    "threadPoolPropertiesDefaults": {
        "coreSize": 20, 
        "maxQueueSize": 200, 
        "queueSizeRejectionThreshold": 50
    }
}

requestSystemC = {
    "groupKey": "requestSystemC", 
    "commandKey": "requestSystemC", 
    "threadPoolKey": "requestSystemC", 
    "commandPropertiesDefaults": {
        "executionTimeoutInMilliseconds": 2000, 
        "executionTimeoutEnabled": true
    }, 
    "threadPoolPropertiesDefaults": {
        "coreSize": 5, 
        "maxQueueSize": 5, 
        "queueSizeRejectionThreshold": 3
    }
}

这时候你的代码可能如下:

public class Bean {
    // 需要请求的系统
    private String requestSystem;
    
    // 省略其他属性和setter/getter方法
}

public class Serivce() {
    public String doService(Bean bean) {
        // 每个请求的外部系统一个setter实例
        Setter setter = getSetterFromRequestSystem(bean.getRequestSystem());
        HelloCommand command = new HelloCommand(setter);
        return command.execute();
    }
    
    public Setter getSetterFromRequestSystem(String requestSystem) {
        // 通过requestSystem获取到配置并实例化成Setter
        return getSetterFromConfig(getConfig(requestSystem));
    }
    
    public Setter getSetterFromConfig(String json) {
        return jsonToObject(json, Setter.class);
    }
}

看到这里,问题就变成了如何把配置信息变成Setter实例的问题了。

总结

本文讲解了两种实现线程隔离的方式,当然@HystrixCommand注解也能实现线程隔离,可以保证每个方法是一个独立的熔断器。

扩展

HystrixCommand逻辑时序:

com.netflix.hystrix.HystrixCommand#execute
-> com.netflix.hystrix.HystrixCommand#queue
-> com.netflix.hystrix.AbstractCommand#toObservable
-> com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
    circuitBreaker.attemptExecution()
    executionSemaphore.tryAcquire()
-> com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
-> com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
-> com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
-> com.netflix.hystrix.HystrixCommand#getExecutionObservable
    Observable.just(run())

HystrixCommand线程池逻辑时序:

com.netflix.hystrix.HystrixCommand#execute
 -> com.netflix.hystrix.HystrixCommand#queue
    toFuture()
-> rx.internal.operators.OperatorSubscribeOn#call
-> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextSchedulerWorker#schedule(rx.functions.Action0)
-> com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.ThreadPoolWorker#schedule(rx.functions.Action0)
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();// HystrixThreadPool
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

@HystrixCommand注解原理

直接看com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect类即可。com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect#methodsAnnotatedWithHystrixCommand方法拦截@HystrixCommand@HystrixCollapser注解。

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}

@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    // 通过jointPoint获取@HystrixCommand注解的信息,创建MetaHolder实例
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    // 通过MetaHolder实例创建HystrixInvokable(HystrixCommand的父接口)
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            // 调用execute()方法,跟上面分析HystrixCommand#execute方法基本一致
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause() != null ? e.getCause() : e;
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

那么@HystrixCommand的commandKey是什么呢?
答案是方法名。

// com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect.CommandMetaHolderFactory#create
public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
    HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
    ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
    MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
    if (isCompileWeaving()) {
        builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
    }
    return builder.defaultCommandKey(method.getName())// commandKey设置为方法名
                    .hystrixCommand(hystrixCommand)
                    .observableExecutionMode(hystrixCommand.observableExecutionMode())
                    .executionType(executionType)
                    .observable(ExecutionType.OBSERVABLE == executionType)
                    .build();
}

当然你也可以通过@HystrixCommand(commandKey = "xxx")设置。

参考
【SpringCloud】HystrixCommand的threadPoolKey默认值及线程池初始化
从源码看hystrix的工作原理
Hystrix源码解析
Hystrix问题记录
Spring Cloud系列--Hystrix源码
Spring Cloud 源码学习之 Hystrix 工作原理
How-it-Works
Hystrix执行过程源码分析

相关文章

  • HystrixCommand实现线程隔离

    Hystrix的基本使用 线程隔离原理 存在全局的线程池线程池缓存,根据threadPoolKey来区分不同的线程...

  • 断路器模式

    依赖隔离 Hystrix的依赖隔离采用了线程池隔离方式,会为每个HystrixCommand创建一个独立的线程池,...

  • ThreadLocal 解析

    ThreadLocal 每个线程都有一个 ThreadLocal 变量的副本,实现了线程的隔离。 使用 实现 ru...

  • Hystrix 隔离策略细粒度控制

    Hystrix 隔离策略细粒度控制 Hystrix 实现资源隔离,有两种策略: 线程池隔离 信号量隔离 对资源隔离...

  • Hystrix - Hystrix 隔离策略细粒度控制

    Hystrix 隔离策略细粒度控制 Hystrix 实现资源隔离,有两种策略: 线程池隔离 信号量隔离 对资源隔离...

  • 用 Hystrix 构建高可用服务架构(中)

    4.Hystrix 隔离策略细粒度控制 Hystrix 实现资源隔离,有两种策略: 线程池隔离 信号量隔离 对资源...

  • Java ThreadLocal 的用法

    前言 ThreadLocal提供了线程局部变量,当前线程全局共享,线程隔离。 源码实现 线程局部变量是存储在Thr...

  • Java - ThreadLocal详细讲解

    ThreadLocal常用来做线程隔离,下面将对ThreadLocal的实现原理、设计理念、内部实现细节(Map、...

  • ThreadLocal剖析

    ThreadLocal可以在多线程下实现各个线程的数据隔离 存储原理 直接看ThreadLocal的get()方法...

  • 源码阅读 - ThreadLocal

    0.什么是ThreadLocal 线程本地变量,线程间读写同一个ThreadLocal实例是线程隔离的。 1.实现...

网友评论

    本文标题:HystrixCommand实现线程隔离

    本文链接:https://www.haomeiwen.com/subject/ksmzfctx.html