使用场景,往往我们使用theadLocal存取用户登陆信息,但是当开启hystrix时使用线程隔离模式,会使用对应线程池内的线程执行feignClient的方法,那么就会导致threadLocal丢失
通过百度以及看源码可以发现hystrix提供了HystrixPlugins,可以看到他的方法
我们先来看看HystrixPlugins暴露的方法
// 可以看到 HystrixPlugins 提供了很多东西,包括线程策略,钩子,事件,等等。。
// 首先第一感觉 钩子是可以用的
HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
首先看看HystrixCommandExecutionHook 的方法,以及执行时机
/**
* Invoked before {@link HystrixInvokable} begins executing.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.2
*/
public <T> void onStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param failureType {@link FailureType} enum representing which type of error
* @param e exception object
*
* @since 1.2
*/
public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
return e; //by default, just pass through
}
/**
* Invoked when {@link HystrixInvokable} finishes a successful execution.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
*
* @param commandInstance The executing HystrixCommand instance.
*
* @since 1.2
*/
public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
* This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
* naturally, or was unsubscribed externally
*
* @param commandInstance The executing HystrixCommand instance.
*
* @since 1.2
*/
public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
// do nothing by default
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} starts.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param e exception object
*
* @since 1.4
*/
public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
return e; //by default, just pass through
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} starts.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.2
*/
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} emits a value.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param value value emitted
*
* @since 1.4
*/
public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
return value; //by default, just pass through
}
/**
* Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
*
* @param commandInstance The executing HystrixInvokable instance.
* @param e exception object
*
* @since 1.2
*/
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
//by default, just pass through
return e;
}
/**
* Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.4
*/
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
*
* @param commandInstance The executing HystrixCommand
*
* @since 1.4
*/
public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
/**
* Invoked with the command is unsubscribed before a terminal state
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.5.9
*/
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
//do nothing by default
}
通过查找文档以及 测试确定了我们需要的方法,这是我们可以写一个自己实现的钩子,但是这个钩子是用不同的执行时机的回调来实现,那么我们需要想办法把需要传递的信息从前面钩子的方法传到后面的钩子方法,简单的方式就是用ConcurrentMap来进行映射,但是没有合适的key,那么我们找到了一个类HystrixRequestContext,其实使用ConcurrentMap映射是一个比较蠢的方式,也就是实在没办法了再这样,正常应该是像TTL那样做一个装饰器,在成员变量里操作,或者统一的地方,那么我们又继续找到了HystrixRequestVariableDefault 这个类在操作HystrixRequestContext,再看一下源码,它是通过包内方法来操作HystrixRequestContext的threadLocal变量
public void set(T value) {
// 这个方法没有暴露给我们使用,而是提供了一个操作器
HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
}
下面看看我们的钩子模样
public class MyHystrixHook extends HystrixCommandExecutionHook {
private final HystrixRequestVariableDefault <Long> reuqestVariable = new HystrixRequestVariableDefault<>();
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
//这里是hystrix 执行对应feignClient方法时开始时的钩子,那么就是在当前主线程内操作
// 先初始化 hystrix的线程私有变量容器
HystrixRequestContext.initializeContext();
// 你要传递的 变量,其实就是从当前主线程的threadLocal中获取到的
reuqestVariable.set(111L);
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
// 这里是你的 FeignClient方法执行失败后的回调钩子,需要清理你操作的数据
// 需要清理 子线程的threadLocal以及HystrixRequestContext
HystrixRequestContext.getContextForCurrentThread().shutdown();
return e; //by default, just pass through
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
// 这里是你的 FeignClient方法执行成功后的回调钩子,需要清理你操作的数据
// 需要清理 子线程的threadLocal以及HystrixRequestContext
HystrixRequestContext.getContextForCurrentThread().shutdown();
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
// 这里已经是子线程了,执行目标 feignClient方法前
Long tenantId = requestVariableDefault.get();
// 这里你可以获取到想要的信息,可以存到threadLocal保证后续方法的一致性
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
// 这里是熔断回调方法开始,也可以管理threadLocal的信息传递, 和上述onStart方法一致
}
@Override
public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
//by default, just pass through
// 熔断回调失败 的钩子,清理即可
return e;
}
@Override
public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
// 熔断回调成功 的钩子,清理即可
}
}
同时怎么才能将我们自定义的钩子让框架去调用呢?
// 直接在项目启动时 调用这个方法 会报错,那么直接这样是不行的。
// 而且我们也可以看到HystrixPlugins.reset() 方法重置,那么再看看getInstance方法内部
HystrixPlugins.getInstance().registerCommandExecutionHook(你的钩子);
hystrix的spi体系
下面来看钩子的实例获取方法
public HystrixCommandExecutionHook getCommandExecutionHook() {
// 这里可以看到hystrix各种类型的插件默认只有一个,但是我们可以在自定义的插件套一层装饰器来实现多个相同类型的插件对方法进行增强(seata集成hystrix中使用到此方式)
if (commandExecutionHook.get() == null) {
// check for an implementation from Archaius first
// 内部spi的方式获取实例
Object impl = getPluginImplementation(HystrixCommandExecutionHook.class);
if (impl == null) {
// cas的方式来保证多个组件或实现只有一个实例正常放入
commandExecutionHook.compareAndSet(null, HystrixCommandExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from Archaius so use it
commandExecutionHook.compareAndSet(null, (HystrixCommandExecutionHook) impl);
}
}
return commandExecutionHook.get();
}
private <T> T getPluginImplementation(Class<T> pluginClass) {
// hystrix 自己提供的 配置文件spi方式获取实例
T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
if (p != null) return p;
// 利用jdk 自动的 serviceLoader方式获取实例
// serviceLoader可以参考 https://www.jianshu.com/p/ccfd80d407ef
return findService(pluginClass, classLoader);
}
我们来看看hystrix配置方式的spi, 其实就是读取hystrix-plugins.properties中文件读取到key value,例如我们替换hook的配置就为
hystrix.plugin.HystrixConcurrencyStrategy.implementation=xxx.xxx.MyTestHystrix
private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {
String classSimpleName = pluginClass.getSimpleName();
// Check Archaius for plugin class.
String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";
String implementingClass = dynamicProperties.getString(propertyName, null).get();
if (implementingClass != null) {
try {
Class<?> cls = Class.forName(implementingClass);
// narrow the scope (cast) to the type we're expecting
cls = cls.asSubclass(pluginClass);
return (T) cls.newInstance();
} catch (ClassCastException e) {
throw new RuntimeException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass);
} catch (ClassNotFoundException e) {
throw new RuntimeException(classSimpleName + " implementation class not found: " + implementingClass, e);
} catch (InstantiationException e) {
throw new RuntimeException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);
} catch (IllegalAccessException e) {
throw new RuntimeException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);
}
} else {
return null;
}
}
关于seata集成hystrix以及feign的实现
上述使用钩子 + spi 替换自己的钩子方式实现,但是我们还可以观察到HystrixConcurrencyStrategy#wrapCallable方法,这不就是妥妥的一个线程执行器的装饰器预留的方法嘛?很显然也可以通过这种方式来实现
如果你引入了spring-cloud-starter-alibaba-seata的话,可以看到线程策略的一个实现com.alibaba.cloud.seata.feign.hystrix.SeataHystrixConcurrencyStrategy
下面来看看代码
public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private final Logger logger = LoggerFactory
.getLogger(SeataHystrixConcurrencyStrategy.class);
// 这里是seata自定义线程策略的被装饰的对象,那么其实是允许多个同样插件存在,不过是通过装饰器包裹后层层增强
private HystrixConcurrencyStrategy delegate;
public SeataHystrixConcurrencyStrategy() {
try {
// 这里通过spi获取实例
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof SeataHystrixConcurrencyStrategy) {
// 如果已经是当前实现则不做任何操作
return;
}
// 这里会重新获取所有其它插件,然后重置后统一再注册进去
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
}
catch (Exception ex) {
logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
}
}
private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
HystrixMetricsPublisher metricsPublisher,
HystrixPropertiesStrategy propertiesStrategy) {
if (logger.isDebugEnabled()) {
logger.debug("Current Hystrix plugins configuration is ["
+ "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
+ eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
+ "propertiesStrategy [" + propertiesStrategy + "]," + "]");
logger.debug("Registering Seata Hystrix Concurrency Strategy.");
}
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
// 这里是上面提到的 线程执行的装饰增强
@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
// 如果是已经装饰了 提前返回
if (c instanceof SeataContextCallable) {
return c;
}
Callable<K> wrappedCallable;
if (this.delegate != null) {
// 如果有其它的自定义的插件,需要再被装饰一层
wrappedCallable = this.delegate.wrapCallable(c);
}
else {
wrappedCallable = c;
}
// 如果是已经装饰了 提前返回
if (wrappedCallable instanceof SeataContextCallable) {
return wrappedCallable;
}
// 真正的对其装饰
return new SeataContextCallable<>(wrappedCallable,
RequestContextHolder.getRequestAttributes());
}
private static class SeataContextCallable<K> implements Callable<K> {
private final Callable<K> actual;
// 哈哈,我们看到了 seata对线程策略的线程执行装饰的真正目的,用于传递seata全局事务id
private final String xid;
private final RequestAttributes requestAttributes;
SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;
this.requestAttributes = requestAttribute;
// 当前还是主线程在执行,所以直接从当前线程获取全局事务id
this.xid = RootContext.getXID();
}
@Override
public K call() throws Exception {
// 典型的装饰增强,这里已经是子线程在执行了
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
RootContext.bind(xid);
return actual.call();
}
finally {
RootContext.unbind();
RequestContextHolder.resetRequestAttributes();
}
}
}
}
那么我们自己可不可以用他这种方式实现threadLocal传递呢,答案是可以的,但是要注意的是循环创建问题,我们添加了一个volatile 变量防止自定义插件的构造方法中通过spi获取实例形成循环创建实例
// 逻辑和seata的自定义插件基本一致
public class MyTestHystrix extends HystrixConcurrencyStrategy {
private final Logger logger = LoggerFactory
.getLogger(SeataHystrixConcurrencyStrategy.class);
private HystrixConcurrencyStrategy delegate;
private static volatile boolean alreadyInit = false;
public MyTestHystrix() {
// 这里添加一个 volatile变量防止构造方法和spi获取实例形成循环
if (alreadyInit) {
return;
}
alreadyInit = true;
try {
this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegate instanceof MyTestHystrix) {
return;
}
HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
propertiesStrategy);
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance()
.registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception ex) {
logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
}
}
@Override
public <K> Callable<K> wrapCallable(Callable<K> c) {
if (c instanceof MyTestHystrix.MyContext) {
return c;
}
Callable<K> wrappedCallable;
if (this.delegate != null) {
wrappedCallable = this.delegate.wrapCallable(c);
} else {
wrappedCallable = c;
}
if (wrappedCallable instanceof MyTestHystrix.MyContext) {
return wrappedCallable;
}
return new MyTestHystrix.MyContext<>(wrappedCallable,
RequestContextHolder.getRequestAttributes());
}
private static class MyContext<K> implements Callable<K> {
private final Callable<K> actual;
private final String tenantId;
private final RequestAttributes requestAttributes;
MyContext(Callable<K> actual, RequestAttributes requestAttribute) {
this.actual = actual;
this.requestAttributes = requestAttribute;
this.tenantId = LoginInfoUtils.getTenantId();
}
@Override
public K call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
// 这里就是手动的将当前子线程信息填充
LoginInfoUtils.fillLoginInfo(null, this.tenantId, null);
return actual.call();
} finally {
// 清理或者 还原(TTL的重放逻辑是还原)
LoginInfoUtils.remove();
RequestContextHolder.resetRequestAttributes();
}
}
}
}
以上就是另一种方式来实现threadLocal的传递,但是据其它文章描述通过callable的wrap方式并不能覆盖到hystrix的fallback,而钩子的自定义实现可以,本人没有去考证,因为直接使用的钩子方式
seata集成feign以及hystrix
先看hystrix的集成
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HystrixCommand.class)
public class SeataHystrixAutoConfiguration {
// 通过注入bean的方式 使用自定义的线程策略,这样的好处是可以通过装饰器来拼装多个自定义实现的插件
@Bean
SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {
return new SeataHystrixConcurrencyStrategy();
}
}
再来看feign的集成
再bean的注入会一层层的进行装饰器增强来完成我们需要的目标,即处理全局事务id
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class SeataFeignClientAutoConfiguration {
// 如果开启了hystrix,返回一个feign构造器
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
return SeataHystrixFeignBuilder.builder(beanFactory);
}
// 如果使用了阿里的 sentinal 返回一个feign构造器
@Bean
@Scope("prototype")
@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
return SeataSentinelFeignBuilder.builder(beanFactory);
}
// 如果没有开启hystrix和sentinal 返回一个只有feign装饰器的feign构造器实例
@Bean
@ConditionalOnMissingBean
@Scope("prototype")
Feign.Builder feignBuilder(BeanFactory beanFactory) {
return SeataFeignBuilder.builder(beanFactory);
}
@Configuration(proxyBeanMethods = false)
protected static class FeignBeanPostProcessorConfiguration {
// 一个 beanPostProcessor 对所有spring 管理的bean进行选择性的装饰
@Bean
SeataBeanPostProcessor seataBeanPostProcessor(
SeataFeignObjectWrapper seataFeignObjectWrapper) {
return new SeataBeanPostProcessor(seataFeignObjectWrapper);
}
// 对于feignContext修饰的一个 拦截处理器
@Bean
SeataContextBeanPostProcessor seataContextBeanPostProcessor(
BeanFactory beanFactory) {
return new SeataContextBeanPostProcessor(beanFactory);
}
// seata自己的真正选择某些bean进行装饰的类
@Bean
SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
return new SeataFeignObjectWrapper(beanFactory);
}
}
}
SeataContextBeanPostProcessor 和 SeataBeanPostProcessor 分别为feignContext,feignClient装饰,这两个都是利用了beanPostProcessor这个钩子所有spring管理的bean都会判断是否需要处理,如果是想要处理的bean则进行装饰
下面来看看SeataFeignObjectWrapper装饰逻辑
// 首先是一个包内可调用的方法,我们自己业务服务内是调用不到的噢,这是封装的一种保护机制
Object wrap(Object bean) {
// 分别有两种feignClient进行装饰
if (bean instanceof Client && !(bean instanceof SeataFeignClient)) {
if (bean instanceof LoadBalancerFeignClient) {
LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(),
clientFactory(), this);
}
if (bean instanceof FeignBlockingLoadBalancerClient) {
FeignBlockingLoadBalancerClient client = (FeignBlockingLoadBalancerClient) bean;
return new SeataFeignBlockingLoadBalancerClient(client.getDelegate(),
beanFactory.getBean(BlockingLoadBalancerClient.class), this);
}
return new SeataFeignClient(this.beanFactory, (Client) bean);
}
return bean;
}
feignContext的装饰最后也是为了继续装饰feign,这种逻辑见到了很多,例如TTL 等一层层的装饰为了覆盖所有情况,保证目标要被装饰的实例各种情况一定被装饰到
下面看看装饰后干了些啥
public class SeataFeignClient implements Client {
private final Client delegate;
private final BeanFactory beanFactory;
private static final int MAP_SIZE = 16;
SeataFeignClient(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
this.delegate = new Client.Default(null, null);
}
SeataFeignClient(BeanFactory beanFactory, Client delegate) {
// 被装饰的client
this.delegate = delegate;
this.beanFactory = beanFactory;
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
// 调用时增强方法
Request modifiedRequest = getModifyRequest(request);
return this.delegate.execute(modifiedRequest, options);
}
private Request getModifyRequest(Request request) {
// 目的明确还是处理 全局事务id
String xid = RootContext.getXID();
if (StringUtils.isEmpty(xid)) {
return request;
}
Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
headers.putAll(request.headers());
List<String> seataXid = new ArrayList<>();
seataXid.add(xid);
headers.put(RootContext.KEY_XID, seataXid);
return Request.create(request.method(), request.url(), headers, request.body(),
request.charset());
}
}
RestTemplate的拦截器,我们发现seata在装饰feignClient的同时也对RestTemplate进行了拦截,防止我们的项目直接使用RestTemplate进行接口调用
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
// 注入我们的拦截器
return new SeataRestTemplateInterceptor();
}
// RestTemplate可能存在多个
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@PostConstruct
public void init() {
if (this.restTemplates != null) {
// 对所有注入的RestTemplate 进行拦截
for (RestTemplate restTemplate : restTemplates) {
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
interceptors.add(this.seataRestTemplateInterceptor);
restTemplate.setInterceptors(interceptors);
}
}
}
}
我们再来看看SeataRestTemplateInterceptor
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
// 目标还是非常明确,对全局事务id进行处理,放入header中
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) {
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}
}
最后我们再看一眼 被调用接口如何处理吧,还是我们非常熟悉的HandlerInterceptor
public class SeataHandlerInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory
.getLogger(SeataHandlerInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) {
// 请求进入前取出header的 全局事务id放入 threadLocal中
String xid = RootContext.getXID();
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (log.isDebugEnabled()) {
log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
}
if (StringUtils.isBlank(xid) && rpcXid != null) {
RootContext.bind(rpcXid);
if (log.isDebugEnabled()) {
log.debug("bind {} to RootContext", rpcXid);
}
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception e) {
// 请求完成清理 threadLocal信息
if (StringUtils.isNotBlank(RootContext.getXID())) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (StringUtils.isEmpty(rpcXid)) {
return;
}
String unbindXid = RootContext.unbind();
if (log.isDebugEnabled()) {
log.debug("unbind {} from RootContext", unbindXid);
}
// 如果解绑和绑定的 全局事务id不同,则对后面的全局事务id再次进行绑定,存入threadLocal
// 这里可能是处理全局事务冲突的特殊情况,目前不太了解
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
log.warn("bind {} back to RootContext", unbindXid);
}
}
}
}
}
解决hystrix跨线程threadLocal丢失总结
- 通过HystrixCommandExecutionHook + spi 配置让自定义钩子被使用,然后通过不同的执行时机处理threadLocal
- 具体传递threadLocal利用 hystrix内部的threadLocal重放机制,即HystrixRequestContext和HystrixRequestVariableDefault 的使用
- 也可以学习seata集成的方式,利用线程策略HystrixConcurrencyStrategy 的Callable 装饰方法进行装饰增强
- 通过@Bean注入自定义HystrixConcurrencyStrategy 插件后重置原有的注册,并且留了一个可装饰的口子,可以让多个自定义插件层层装饰(hystrix的本身的插件只允许存在一个)
seata继承hystrix和feign
- 如果开启了hystrix/sentinal等注入自定义的client构造装饰器,否则使用默认的装饰器
- 对hystrix线程切换定义了一个可多层装饰的 自定义HystrixConcurrencyStrategy 插件,通过HystrixConcurrencyStrategy#wrapCallable 进行增强
- 对feignContext,feignClient进行装饰,最终目标装饰为SeataFeignClient类将全局事务id放入header
- 对所有注入到spring容器内的RestTemplate进行拦截,将全局事务id放入header
- 通过spring mvc的HandlerInterceptor取出header的事务id放入threadLocal
网友评论