上篇文章介绍了hystrix的使用,当一个@HystrixCommand被执行时,可以有两种不同的隔离策略:THREAD(线程)和SEMAPHORE(信号量)。在使用THREAD(线程)隔离策略的时候,由于每个hystrix命令都在一个单独的线程池中执行,这样的话就获取不到父线程的上线文。
示例
这里我简单写了一个ThreadLocal的例子:
/**
* threadLocal问题
*
* @author hui.wang
* @since 18 November 2018
*/
public class ThreadLocalTest {
static ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
THREAD_LOCAL.set("测试");
new A().run();
new C().run();
}).start();
}
}
class A {
public void run() {
System.out.println("=========方式1==============");
System.out.println("thread name :" + Thread.currentThread().getName());
System.out.println("A get ThreadLocal :" + ThreadLocalTest.THREAD_LOCAL.get());
new B().run();
}
}
class B {
public void run() {
System.out.println("************");
System.out.println("thread name :" + Thread.currentThread().getName());
System.out.println("B get ThreadLocal :" + ThreadLocalTest.THREAD_LOCAL.get());
}
}
class C {
public void run() {
System.out.println("=========方式2==============");
System.out.println("thread name :" + Thread.currentThread().getName());
System.out.println("C get ThreadLocal :" + ThreadLocalTest.THREAD_LOCAL.get());
new Thread(() -> new D().run()).start();
}
}
class D {
public void run() {
System.out.println("************");
System.out.println("thread name :" + Thread.currentThread().getName());
System.out.println("D get ThreadLocal :" + ThreadLocalTest.THREAD_LOCAL.get());
}
}
打印结果为:
=========方式1==============
thread name :Thread-0
A get ThreadLocal :测试
************
thread name :Thread-0
B get ThreadLocal :测试
=========方式2==============
thread name :Thread-0
C get ThreadLocal :测试
************
thread name :Thread-1
D get ThreadLocal :null
可以看到,在C.class
里面开启了一个线程执行D.class
方法,这时候D.class
是获取不到父线程的上线文的。
在我们生产环境经常使用的是,用filter拦截状态信息(例如登录态信息),然后存放在ThreadLocal中,这个时候被@HystrixCommand
(使用线程策略)修饰的方法时获取不到这个ThreadLocal的。
解决方案
- 准备开发环境
定义一个UserInfo
的类用来存放用户传递的上下文信息
public class UserInfo implements Serializable{
public static final String USER_ID = "user_id";
public static final String USER_TOKEN = "user_token";
private String userId;
private String userToken;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getUserToken() {
return userToken;
}
public void setUserToken(String userToken) {
this.userToken = userToken;
}
@Override
public String toString() {
return "UserInfo{" +
"userId='" + userId + '\'' +
", userToken='" + userToken + '\'' +
'}';
}
}
接着创建ThreadLocal保存用户上下文信息
/**
* 保存用户信息上线文
*
* @author hui.wang
* @since 18 November 2018
*/
public class UserContext {
private static final ThreadLocal<UserInfo> USER_CONTEXT = new ThreadLocal<>();
/**
* 获取用户信息
*/
public static UserInfo getUserInfo() {
UserInfo userInfo = USER_CONTEXT.get();
if (Objects.isNull(userInfo)) {
userInfo = create();
USER_CONTEXT.set(userInfo);
}
return USER_CONTEXT.get();
}
private static UserInfo create() {
return new UserInfo();
}
/**
* 保存用户信息
*/
public static void setContext(UserInfo userInfo) {
Assert.notNull(userInfo, "Only non-null UserContext instances are permitted");
USER_CONTEXT.set(userInfo);
}
/**
* 销毁
*/
public static void clean() {
USER_CONTEXT.remove();
}
}
然后配置filter拦截每次的用户信息保存到ThreadLocal中
/**
* 模拟用户信息filter
*
* @author hui.wang
* @since 18 November 2018
*/
@Component
public class UserContextFilter implements Filter{
private static final Logger LOGGER = LoggerFactory.getLogger(UserContextFilter.class);
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// not do
}
/**
* 实现doFilter方法
* 这里将用户信息保存到上下文中,并在请求结束后clear
*/
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
try {
try {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
if (StringUtils.isNotEmpty(httpServletRequest.getHeader(UserInfo.USER_ID)) &&
StringUtils.isNotEmpty(httpServletRequest.getHeader(UserInfo.USER_TOKEN))) {
UserContext.getUserInfo().setUserId(httpServletRequest.getHeader(UserInfo.USER_ID));
UserContext.getUserInfo().setUserToken(httpServletRequest.getHeader(UserInfo.USER_TOKEN));
LOGGER.info("filter set userInfo success");
}
} catch (Exception e) {
LOGGER.error("filter set userInfo error", e);
}
filterChain.doFilter(servletRequest, servletResponse);
} finally {
UserContext.clean();
}
}
@Override
public void destroy() {
}
}
搭建完成后,只要在请求的header里面添加上user_id
和user_token
字段后,filter就可以将这两个信息保存到上下文中
写一个测试controller:
/**
* 测试filter
* @see com.hui.wang.spring.cloud.filter.UserContextFilter
*
* @author hui.wang
* @since 18 November 2018
*/
@RestController
public class FilterController {
private final Logger LOGGER = Logger.getLogger(FilterController.class);
@Autowired
private RestTemplate restTemplate;
@RequestMapping("/userFilter/v1")
public UserInfo userFilter() {
UserInfo userInfo = UserContext.getUserInfo();
return userInfo;
}
}
访问:
curl -H'user_id:hui.wang' -H'user_token:123' http://localhost:8111/userFilter/v1
返回:
{
"userId":"hui.wang",
"userToken":"123"
}
这样filter部门和上线文部分完成,这个使用如果直接使用@HystrixCommand
命令,在执行过程使用UserContext.getUserInfo()
是获取不到上下文的
2.自定义Hystrix并发策略
编写ThreadLocalAwareStrategy
自定义的并发策略类
/**
* 自定义并发Hystrix策略
*
* @author hui.wang
* @since 18 November 2018
*/
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy{
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
public ThreadLocalAwareStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
public ThreadLocalAwareStrategy() {
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue) : super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize) : super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContext.getUserInfo())) : super.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContext.getUserInfo()));
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null ? existingConcurrencyStrategy.getRequestVariable(rv) : super.getRequestVariable(rv);
}
}
在wrapCallable
方法里面需要提供一个Java Callcable
类将用户信息上线文注入到Hystrix中
/**
* 定义一个Java callable 类,将UserContext注入Hystrix命令中
*
* @author hui.wang
* @since 18 November 2018
*/
public class DelegatingUserContextCallable<V> implements Callable<V>{
private final Logger LOGGER = Logger.getLogger(DelegatingUserContextCallable.class);
private final Callable<V> delegate;
private UserInfo originalUserInfo;
public DelegatingUserContextCallable(Callable<V> delegate, UserInfo userInfo) {
Assert.notNull(delegate, "delegate cannot be null");
Assert.notNull(userInfo, "userContext cannot be null");
this.delegate = delegate;
this.originalUserInfo = userInfo;
}
public DelegatingUserContextCallable(Callable<V> delegate) {
this(delegate, UserContext.getUserInfo());
}
@Override
public V call() throws Exception {
UserContext.setContext(originalUserInfo);
try {
return delegate.call();
} finally {
this.originalUserInfo = null;
}
}
public static <V> Callable<V> create(Callable<V> delegate, UserInfo userContext) {
return new DelegatingUserContextCallable<V>(delegate, userContext);
}
}
接着配置我们自定义的Hystrix并发策略
/**
* 配置自定义Hystrix并发策略
*
* @author hui.wang
* @since 18 November 2018
*/
@Configuration
public class ThreadLocalConfiguration {
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixPlugins.reset();
//注册自定义的Hystrix并发策略
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}
这样就在Hystrix中访问上下文信息了
/**
*
* @author hui.wang
* @since 18 November 2018
*/
@RestController
public class HystrixController {
private final Logger LOGGER = Logger.getLogger(HystrixController.class);
@Autowired
private RestTemplate restTemplate;
@HystrixCommand(
threadPoolKey = "hystrix-v1",
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "10")
}
)
@RequestMapping("/hystrix/v1")
public String hystrixV1() {
UserInfo userInfo = UserContext.getUserInfo();
LOGGER.info("=========================");
LOGGER.info("userInfo = " + userInfo.toString());
LOGGER.info("=========================");
return restTemplate.getForEntity("http://provider-server/provider?request={1}", String.class, "test").getBody();
}
}
可以看到我们日志打印出的上下文信息
网友评论