美文网首页
Hystrix之ThreadLocal上下文传播

Hystrix之ThreadLocal上下文传播

作者: 汪先森出版社 | 来源:发表于2018-09-10 13:56 被阅读0次

引言

ThreadLocal

ThreadLocal这个类给线程提供了一个本地变量,这个变量是该线程自己拥有,各线程间不共享。在该线程存活和ThreadLocal实例能访问的时候,保存了对这个变量副本的引用。当线程消失的时候,所有的本地实例都会被GC。并且建议ThreadLocal最好是使用 private static 修饰。

InheritableThreadLocal

InheritableThreadLocal是为了解决子线程获得父线程本地变量的需求,继承自ThreadLocal。如果你使用它,那么保存的所有东西都已经不在原来的threadLocals里面,而是在一个新的叫inheritableThreadLocals变量中。意思就是说每个线程Thread里面还有一个Map变量,名叫inheritableThreadLocals,它保存的是需要传递的引用(通过InheritableThreadLocal设置的线程变量)。

这种父子传递的需求还是有些比较重要的应用场景,如上下文传递(用户标识、事务等),调用日志跟踪等。Log4j中的MDC就是基于InheritableThreadLocal实现。但一般来说我们用线程池比较多,线程池会缓存线程,重复使用,线程可能会执行不同的任务。这样一来它的上下文传递就达不到正确的效果。

TransmittableThreadLocal

阿里巴巴有个开源项目就是为了解决线程池中变量传递,它里面有个叫TransmittableThreadLocal,继承于InheritableThreadLocal。它通过包装返回Runnable的方式代理了run方法,在run之前copy装载线程变量,run之后清除线程变量,来实现此功能。TransmittableThreadLocal除了继承过来的线程Map,它还定义了一个名叫holder的InheritableThreadLocal静态变量,也就是说TransmittableThreadLocal有两套线程变量。

但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。

HystrixConcurrencystrategy

Hystrixconcurrencystrategy是Hystrix的线程池创建源码所在,并且提供方法wrapCallable来装饰线程池执行环境。所以我们我们可以自定义一个并发策略,即可于Hystrix应用场景内实现线程上下文的传播。附wrapCallable源码

/**
 * Provides an opportunity to wrap/decorate a {@code Callable<T>} before execution.
 * <p>
 * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).
 * <p>
 * <b>Default Implementation</b>
 * <p>
 * Pass-thru that does no wrapping.
 *
 * @param callable
 *            {@code Callable<T>} to be executed via a {@link ThreadPoolExecutor}
 * @return {@code Callable<T>} either as a pass-thru or wrapping the one given
 */
public <T> Callable<T> wrapCallable(Callable<T> callable) {
    return callable;
}

Ps:注释上有一句这么说的:This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}).

拓展

既然这个方法可以装饰线程池回调,那么我们亦可定义一个装饰器接口,只要这个接口的实现类,都会通过上述并发策略装饰不同业务不同场景需要的线程变量。

装饰器接口定义

/**
 * Hystrix CallBack 装饰器定义
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:18
 **/
public interface HystrixCallableWrapper {

    /**
     * 装饰 Callable实例
     *
     * @param callable
     *            待装饰实例
     * @param <T>
     *            返回类型
     * @return 装饰后的实例
     */
    <T> Callable<T> wrap(Callable<T> callable);
}

HystrixConcurrencystrategy Custom

/**
 * Hystrix并发策略
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:06
 **/
public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy {

    /** 装饰队列 */
    private final List<HystrixCallableWrapper> wrappers;

    public HystrixConcurrencyStrategyCustom(List<HystrixCallableWrapper> wrappers) {
        this.wrappers = wrappers;
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new CallableWrapperChain(callable, this.wrappers).wrapCallable();
    }

    /**
     * callback 调用链
     *
     * @author wangzhuhua
     * @date 2018/09/07 下午4:38
     **/
    private static class CallableWrapperChain<T> {

        /** 回调 */
        private final Callable<T> callable;
        /** 回调包装 */
        private final List<HystrixCallableWrapper> wrappers;

        CallableWrapperChain(Callable<T> callable, List<HystrixCallableWrapper> wrappers) {
            this.callable = callable;
            this.wrappers = wrappers;
        }

        /**
         * 装饰线程hystrix callable
         * 
         * @return {@link Callable}
         */
        Callable<T> wrapCallable() {
            Callable<T> delegate = callable;
            for (HystrixCallableWrapper wrapper : wrappers) {
                delegate = wrapper.wrap(delegate);
            }
            return delegate;
        }
    }

}

Configuration

/**
 * Hystrix配置
 *
 * @author wangzhuhua
 * @date 2018/09/07 下午4:01
 **/
@Configuration
@ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")
public class HystrixConfiguration {

    @Autowired(required = false)
    private List<HystrixCallableWrapper> wrappers = new ArrayList<>();

    @PostConstruct
    public void init() {
        HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers));
    }

}

实现示例

/**
 * Token 装饰器
 *
 * @author wangzhuhua
 * @date 2018/09/10 上午11:28
 **/
@Component
public class TokenWrapper implements HystrixCallableWrapper {
    @Override
    public <T> Callable<T> wrap(Callable<T> callable) {
        return new TokenAwareCallable(callable, TokenHandler.getToken());
    }

    /**
     * Token装饰
     *
     * @param <T>
     */
    static class TokenAwareCallable<T> implements Callable<T> {

        /** 回调代理 */
        private final Callable<T> delegate;
        /** Token */
        private final String token;

        TokenAwareCallable(Callable<T> callable, String token) {
            this.delegate = callable;
            this.token = token;
        }

        @Override
        public T call() throws Exception {
            try {
                // 填充当前线程变量
                TokenHandler.setToken(this.token);
                return delegate.call();
            } finally {
                TokenHandler.remove();
            }
        }
    }
}

其中TokenHandler内使用线程变量存储

相关文章

网友评论

      本文标题:Hystrix之ThreadLocal上下文传播

      本文链接:https://www.haomeiwen.com/subject/djrsgftx.html