1、常见场景
场景一:工作中根据日志排查问题时我们经常想看到某个请求下的所有日志,可是由于生产环境并发很大,服务被调过于频繁,日志刷新太快每个请求之间的日志并不连贯,互相穿插,如果在打印日志时没有为日志增加一个唯一标识,是没法分辨出哪些日志是哪个请求打印,会影响到测试联调、线上问题排查的效率;
场景二:我们想知道一个请求中所有和该请求相关的链路日志,尤其是涉及到多个微服务时,此时也需要为日志增加一个唯一标识。通常可以使用UUID或者其它雪花算法等作为唯一标识;
基于以上场景,我们可以为每个请求设置一个traceId,这个请求整个链路公用同一个traceId,然后将日志收集到统一日志平台通过日志关键字找出traceId,再根据traceId,找出整个链路的请求过程,甚至还可以与分布式链路框架skywalking结合,分析链路的性能。
2、实现原理
2.1 MDC
MDC是(Mapped Diagnostic Context,映射调试上下文)是日志框架 log4j 和 logback 支持的一种方便在多线程条件下记录追踪日志的功能。
2.2 MDC原理
MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。
MDC 底层最终使用的是 ThreadLocal 实现。
2.3 MDC使用
API 说明:
-
clear()
:移除所有MDC -
get (String key)
:获取当前线程 MDC 中指定 key 的值 -
getCopyOfContextMap()
:将MDC从内存获取出来,再传给线程 -
put(String key, Object o)
:往当前线程的 MDC 中存入指定的键值对 -
remove(String key)
:删除当前线程 MDC 中指定的键值对 -
setContextMap()
:将父线程的MDC内容传给子线程
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;
importorg.slf4j.MDC;importjava.util.UUID;
/*** MDC快速入门示例*/
public classSimpleMDC {
private static final Logger logger = LoggerFactory.getLogger(SimpleMDC.class);
public static final String REQ_ID = "REQ_ID";
public static voidmain(String[] args) {
MDC.put(REQ_ID, UUID.randomUUID().toString());
logger.info("开始调用服务A,进行业务处理");
logger.info("业务处理完毕,可以释放空间了,避免内存泄露");
MDC.remove(REQ_ID);
logger.info("REQ_ID 还有吗?{}", MDC.get(REQ_ID) != null);
}
}
2.4 MDC使用场景
-
在 WEB 应用中,如果想在日志中输出请求用户 IP 地址、请求 URL、统计耗时等等,MDC 基本都能支撑;
-
在 WEB 应用中,如果能画出用户的请求到响应整个过程,势必会快速定位生产问题,那么借助 MDC 来保存用户请求时产生的 reqId,当请求完成后,再将这个 reqId 进行移除,这么一来通过 grep reqId 就能轻松 get 整个请求流程的日志轨迹;
-
在微服务盛行的当下,链路跟踪是个难题,而借助 MDC 去埋点,巧妙实现链路跟踪应该不是问题。
3、分布式traceId实现方案
1、 引入log4j的同时需要注意去除冲突包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<!-- 排除自带的logback依赖 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
2、resources 文件夹下添加[log4j.xml]配置文件
<appender name="trace" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="D://logs//trace.log"/>
<param name="DatePattern" value="'.'yyyy-MM-dd"/>
<param name="threshold" value="info"/>
<param name="append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] - [%X{traceId}] %-6p%c:%L - %m%n"/>
</layout>
</appender>
<root>
<level value="info"/>
<appender-ref ref="trace"/>
</root>
3、添加[MDCUtils]自定义工具类
public class MDCUtils {
/**
* [获取 traceId]
* @return java.lang.String
**/
public static String mdc(){
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
HttpServletRequest request = (HttpServletRequest) requestAttributes
.resolveReference(RequestAttributes.REFERENCE_REQUEST);
String traceId;
String traceIdKey = "traceId";
if (request.getHeader(traceIdKey) == null) {
traceId = UUID.randomUUID().toString();
} else {
traceId = request.getHeader(traceIdKey);
}
return traceId;
}
}
4、添加拦截器[LogInterceptor]为每一个请求头添加[traceId]
public class LogInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceIdKey = "traceId";
String traceId = MDCUtils.mdc();
request.setAttribute(traceIdKey, traceId);
MDC.clear();
MDC.put(traceIdKey, traceId);
return true;
}
}
5、注册拦截器
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LogInterceptor()).addPathPatterns("/test/*");
}
}
6、在每次调用外部接口的httpclient地方添加请求头[traceId]
public class HttpClientHelper {
public static String sendPost(String url, LinkedHashMap<String, Object> paramMap, Map<String, Object> headMap) {
// 获取连接客户端工具
CloseableHttpClient httpClient = HttpClients.createDefault();
String entityStr = null;
CloseableHttpResponse response = null;
try {
// 创建POST请求对象
HttpPost httpPost = new HttpPost(url);
UrlEncodedFormEntity entityParam = null;
/*
* 添加请求参数
*/
// 创建请求参数
if (!CollectionUtils.isEmpty(paramMap)) {
List<NameValuePair> paramertersList = new LinkedList<>();
Set<String> keys = paramMap.keySet();
for (String s : keys) {
String key = String.valueOf(s);
String value = ObjectUtils.isEmpty(paramMap.get(key)) ? null : paramMap.get(key).toString();
if (ObjectUtils.isNotEmpty(value)) {
BasicNameValuePair param1 = new BasicNameValuePair(key, value);
paramertersList.add(param1);
}
}
// 使用URL实体转换工具
entityParam = new UrlEncodedFormEntity(paramertersList, "UTF-8");
}
httpPost.setEntity(entityParam);
if (headMap != null) {
for (Map.Entry<String, Object> entry : headMap.entrySet()) {
if (entry.getValue() != null) {
httpPost.addHeader(entry.getKey(), entry.getValue().toString());
}
}
}
String traceId = MDC.get("traceId");
httpPost.addHeader("traceId", traceId);
httpPost.addHeader("Accept", "*/*");
httpPost.addHeader("Connection", "Keep-Alive");
httpPost.addHeader("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.7.6)");
// 传输的类型
httpPost.addHeader("Content-Type", "application/x-www-form-urlencoded");
// 执行请求
response = httpClient.execute(httpPost);
// 获得响应的实体对象
HttpEntity entity = response.getEntity();
// 使用Apache提供的工具类进行转换成字符串
entityStr = EntityUtils.toString(entity, "UTF-8");
// 此处获取所有的响应头信息并进行打印
System.out.println(Arrays.toString(response.getAllHeaders()));
} catch (ClientProtocolException e) {
System.err.println("Http协议出现问题");
e.printStackTrace();
} catch (ParseException e) {
System.err.println("解析错误");
e.printStackTrace();
} catch (IOException e) {
System.err.println("IO异常");
e.printStackTrace();
} finally {
// 释放连接
if (null != response) {
try {
response.close();
httpClient.close();
} catch (IOException e) {
System.err.println("释放连接出错");
e.printStackTrace();
}
}
}
return entityStr;
}
}
4、多线程间使用
MDC
异步线程间传递:
用MDC
的put时,子线程在创建的时候会把父线程中的inheritableThreadLocals
变量设置到子线程的inheritableThreadLocals
中,而MDC
内部是用InheritableThreadLocal
实现的,所以会把父线程中的上下文带到子线程中
但在线程池中,由于线程会被重用,但是线程本身只会初始化一次,所以之后重用线程的时候,就不会进行初始化操作了,也就不会有父线程inheritableThreadLocals
拷贝到子线程中的过程了,这个时候如果还想传递父线程的上下文的话,就要使用getCopyOfContextMap
方法
4.1 MDC工具类
定义MDC工具类,支持Runnable
和Callable
两种,目的就是为了把父线程的traceId设置给子线程
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* @Description 封装MDC用于向线程池传递
*/
public class MDCUtil {
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {//清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
setTraceIdIfAbsent();
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
public static void setMDCContextMap(final Map<String, String> context) {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
}
}
4.2 Spring ThreadPoolTaskExecutor线程池使用
配置线程池
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolMdcWrapper();
//核心线程数,默认为1
taskExecutor.setCorePoolSize(1);
//最大线程数,默认为Integer.MAX_VALUE
taskExecutor.setMaxPoolSize(200);
//队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
taskExecutor.setQueueCapacity(2000);
//线程池维护线程所允许的空闲时间,默认为60s
taskExecutor.setKeepAliveSeconds(60);
//线程池对拒绝任务(无线程可用)的处理策略
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
继承ThreadPoolTaskExecutor
public class ThreadPoolMdcWrapper extends ThreadPoolTaskExecutor {
public ThreadPoolMdcWrapper() {
}
@Override
public void execute(Runnable task) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public void execute(Runnable task, long startTimeout) {
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}
4.2 使用ExecutorCompletionService方式
当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。
ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。
ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。
ExecutorCompletionService 相比之前 Future 相比 ,提供了一个通知机制,将结果统一到一个队列,当前提交任务不会阻塞获取,从另一个队列中阻塞获取。
/**
* 使用MDC传递traceId
*/
public class Demo {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
public void demo() {
ExecutorCompletionService ecs = new ExecutorCompletionService(threadPoolExecutor);
ecs.submit(MDCUtil.wrap(new TestMDC(), MDC.getCopyOfContextMap()));
}
class TestMDC implements Callable {
@Override
public Object call() throws Exception {
// TODO 代码逻辑
return null;
}
}
}
2.3.3 使用CompletableFuture方式
/**
* 使用MDC传递traceId
*/
public class Demo {
@Autowired
private ThreadPoolExecutor threadPoolExecutor;
private CompletableFuture<Result> test() {
Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
// 必须在打印日志前设置
MDCUtil.setMDCContextMap(copyOfContextMap);
//MDC.put("subTraceId",''); //如果需要对子线程进行加线程跟踪号,可在此处设定
// TODO 业务逻辑
return new Result();
}, threadPoolExecutor).exceptionally(new Function<Throwable, Result>() {
/**捕捉异常,不会导致整个流程中断**/
@Override
public Result apply(Throwable throwable) {
log.error("线程[{}]发生了异常[{}], 继续执行其他线程", Thread.currentThread().getName(), throwable.getMessage());
return null;
}
});
}
}
网友评论