ThreadLocal可以隔离线程变量,在项目中可以应用在存储当前线程信息,例如登录用户,租户等,但是在需要异步执行时,上下文信息需要手动传递,增加与业务无关的代码,jdk虽然提供了InheritableThreadLocal,但是在使用线程池时,无法用新值覆盖旧值导数数据不准确。阿里开源了TransmittableThreadLocal用于支持线程池下的异步信息传递。
在微服务的异步调用时一些公共信息可以使用TransmittableThreadLocal进行传递。
首先创建一个context类存储用户信息
public class UserContext {
private static final ThreadLocal<String> ttl = new TransmittableThreadLocal<>();
public static void setTtl(String s) {
ttl.set(s);
}
public static String getTtl() {
return ttl.get();
}
public static void removeTtl() {
ttl.remove();
}
}
配置使用ttl线程池
@Bean(name = "ttlExecutor")
public Executor ttlExecutor() {
ThreadPoolTaskExecutor asyncTaskExecutor = new ThreadPoolTaskExecutor();
asyncTaskExecutor.setMaxPoolSize(2);
asyncTaskExecutor.setCorePoolSize(2);
asyncTaskExecutor.setQueueCapacity(10);
asyncTaskExecutor.setThreadNamePrefix("ttl-async-task-thread-pool-");
asyncTaskExecutor.initialize();
asyncTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return TtlExecutors.getTtlExecutor(asyncTaskExecutor);
}
使用handlerinteceptor拦截请求信息
public class RequestHandlerInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
Enumeration<String> headerNames = request.getHeaderNames();
HashMap<String, String> requestMap = Maps.newHashMapWithExpectedSize(10);
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String value = request.getHeader(name);
requestMap.put(name, value);
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {
//AsyncRequestContextHolder.resetRequestMap();
}
}
注册拦截器
public class WebApiAutoConfiguration implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(requestHandlerInterceptor).addPathPatterns("/**");
}
}
feign调用时添加header信息到当前请求
public class FeignAutoConfiguration {
@Bean
public RequestInterceptor tenantInterceptor() {
return template -> {
// 需要清空的信息
// template.removeHeader("");
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (null == attributes) {
Map<String, String> requestMap = AsyncRequestContextHolder.getRequestMap();
requestMap.forEach((name, values) -> {
// 跳过content-length,避免Feign报错(feign.RetryableException: too many bytes written executing ...)
if (name.equals("content-length")) {
return;
}
template.header(name, values);
}
}
else {
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String values = request.getHeader(name);
// 跳过content-length,避免Feign报错(feign.RetryableException: too many bytes written executing ...)
if (name.equals("content-length")) {
continue;
}
template.header(name, values);
}
}
}
}
网友评论