分布式traceId

作者: 雪飘千里 | 来源:发表于2023-05-22 15:17 被阅读0次

    1、常见场景

    场景一:工作中根据日志排查问题时我们经常想看到某个请求下的所有日志,可是由于生产环境并发很大,服务被调过于频繁,日志刷新太快每个请求之间的日志并不连贯,互相穿插,如果在打印日志时没有为日志增加一个唯一标识,是没法分辨出哪些日志是哪个请求打印,会影响到测试联调、线上问题排查的效率;

    场景二:我们想知道一个请求中所有和该请求相关的链路日志,尤其是涉及到多个微服务时,此时也需要为日志增加一个唯一标识。通常可以使用UUID或者其它雪花算法等作为唯一标识;

    基于以上场景,我们可以为每个请求设置一个traceId,这个请求整个链路公用同一个traceId,然后将日志收集到统一日志平台通过日志关键字找出traceId,再根据traceId,找出整个链路的请求过程,甚至还可以与分布式链路框架skywalking结合,分析链路的性能。

    2、实现原理

    2.1 MDC

    MDC是(Mapped Diagnostic Context,映射调试上下文)是日志框架 log4j 和 logback 支持的一种方便在多线程条件下记录追踪日志的功能。

    2.2 MDC原理

    MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。

    MDC 底层最终使用的是 ThreadLocal 实现。

    2.3 MDC使用

    API 说明:

    • clear():移除所有MDC
    • get (String key):获取当前线程 MDC 中指定 key 的值
    • getCopyOfContextMap():将MDC从内存获取出来,再传给线程
    • put(String key, Object o):往当前线程的 MDC 中存入指定的键值对
    • remove(String key):删除当前线程 MDC 中指定的键值对
    • setContextMap():将父线程的MDC内容传给子线程
    importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;
    importorg.slf4j.MDC;importjava.util.UUID;
    
    /*** MDC快速入门示例*/
    
    public classSimpleMDC {
    
        private static final Logger logger = LoggerFactory.getLogger(SimpleMDC.class);
        public static final String REQ_ID = "REQ_ID";
    
        public static voidmain(String[] args) {
    
            MDC.put(REQ_ID, UUID.randomUUID().toString());
    
            logger.info("开始调用服务A,进行业务处理");
    
            logger.info("业务处理完毕,可以释放空间了,避免内存泄露");
    
            MDC.remove(REQ_ID);
    
            logger.info("REQ_ID 还有吗?{}", MDC.get(REQ_ID) != null);
        }
    }
    
    

    2.4 MDC使用场景

    • 在 WEB 应用中,如果想在日志中输出请求用户 IP 地址、请求 URL、统计耗时等等,MDC 基本都能支撑;

    • 在 WEB 应用中,如果能画出用户的请求到响应整个过程,势必会快速定位生产问题,那么借助 MDC 来保存用户请求时产生的 reqId,当请求完成后,再将这个 reqId 进行移除,这么一来通过 grep reqId 就能轻松 get 整个请求流程的日志轨迹;

    • 在微服务盛行的当下,链路跟踪是个难题,而借助 MDC 去埋点,巧妙实现链路跟踪应该不是问题。

    3、分布式traceId实现方案

    1、 引入log4j的同时需要注意去除冲突包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <!-- 排除自带的logback依赖 -->
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
    

    2、resources 文件夹下添加[log4j.xml]配置文件

    <appender name="trace" class="org.apache.log4j.DailyRollingFileAppender">
        <param name="File" value="D://logs//trace.log"/>
        <param name="DatePattern" value="'.'yyyy-MM-dd"/>
        <param name="threshold" value="info"/>
        <param name="append" value="true"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] - [%X{traceId}] %-6p%c:%L - %m%n"/>
        </layout>
    </appender>
     
    <root>
        <level value="info"/>
        <appender-ref ref="trace"/>
    </root>
    

    3、添加[MDCUtils]自定义工具类

    public class MDCUtils {
     
        /**
         * [获取 traceId]
         * @return java.lang.String
         **/
        public static String mdc(){
            RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
            HttpServletRequest request = (HttpServletRequest) requestAttributes
                    .resolveReference(RequestAttributes.REFERENCE_REQUEST);
     
            String traceId;
            String traceIdKey = "traceId";
            if (request.getHeader(traceIdKey) == null) {
                traceId = UUID.randomUUID().toString();
            } else {
                traceId = request.getHeader(traceIdKey);
            }
            return traceId;
        }
     
    }
    

    4、添加拦截器[LogInterceptor]为每一个请求头添加[traceId]

    public class LogInterceptor implements HandlerInterceptor {
     
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
            String traceIdKey = "traceId";
            String traceId = MDCUtils.mdc();
            request.setAttribute(traceIdKey, traceId);
            MDC.clear();
            MDC.put(traceIdKey, traceId);
            return true;
        }
    }
    

    5、注册拦截器

    @Configuration
    public class WebMvcConfig implements WebMvcConfigurer {
     
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(new LogInterceptor()).addPathPatterns("/test/*");
        }
    }
    

    6、在每次调用外部接口的httpclient地方添加请求头[traceId]

    public class HttpClientHelper {
     
        public static String sendPost(String url, LinkedHashMap<String, Object> paramMap, Map<String, Object> headMap) {
            // 获取连接客户端工具
            CloseableHttpClient httpClient = HttpClients.createDefault();
            String entityStr = null;
            CloseableHttpResponse response = null;
     
            try {
                // 创建POST请求对象
                HttpPost httpPost = new HttpPost(url);
                UrlEncodedFormEntity entityParam = null;
     
                /*
                 * 添加请求参数
                 */
                // 创建请求参数
                if (!CollectionUtils.isEmpty(paramMap)) {
                    List<NameValuePair> paramertersList = new LinkedList<>();
     
                    Set<String> keys = paramMap.keySet();
                    for (String s : keys) {
                        String key = String.valueOf(s);
                        String value = ObjectUtils.isEmpty(paramMap.get(key)) ? null : paramMap.get(key).toString();
                        if (ObjectUtils.isNotEmpty(value)) {
                            BasicNameValuePair param1 = new BasicNameValuePair(key, value);
                            paramertersList.add(param1);
                        }
                    }
                    // 使用URL实体转换工具
                    entityParam = new UrlEncodedFormEntity(paramertersList, "UTF-8");
                }
     
                httpPost.setEntity(entityParam);
     
                if (headMap != null) {
                    for (Map.Entry<String, Object> entry : headMap.entrySet()) {
                        if (entry.getValue() != null) {
                            httpPost.addHeader(entry.getKey(), entry.getValue().toString());
                        }
                    }
                }
     
                String traceId = MDC.get("traceId");
                httpPost.addHeader("traceId", traceId);
                httpPost.addHeader("Accept", "*/*");
                httpPost.addHeader("Connection", "Keep-Alive");
                httpPost.addHeader("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.7.6)");
                // 传输的类型
                httpPost.addHeader("Content-Type", "application/x-www-form-urlencoded");
     
                // 执行请求
                response = httpClient.execute(httpPost);
                // 获得响应的实体对象
                HttpEntity entity = response.getEntity();
                // 使用Apache提供的工具类进行转换成字符串
                entityStr = EntityUtils.toString(entity, "UTF-8");
     
                // 此处获取所有的响应头信息并进行打印
                System.out.println(Arrays.toString(response.getAllHeaders()));
            } catch (ClientProtocolException e) {
                System.err.println("Http协议出现问题");
                e.printStackTrace();
            } catch (ParseException e) {
                System.err.println("解析错误");
                e.printStackTrace();
            } catch (IOException e) {
                System.err.println("IO异常");
                e.printStackTrace();
            } finally {
                // 释放连接
                if (null != response) {
                    try {
                        response.close();
                        httpClient.close();
                    } catch (IOException e) {
                        System.err.println("释放连接出错");
                        e.printStackTrace();
                    }
                }
            }
            return entityStr;
        }
     
    }
    

    4、多线程间使用

    MDC异步线程间传递:
    MDC的put时,子线程在创建的时候会把父线程中的inheritableThreadLocals变量设置到子线程的inheritableThreadLocals中,而MDC内部是用InheritableThreadLocal实现的,所以会把父线程中的上下文带到子线程中
    但在线程池中,由于线程会被重用,但是线程本身只会初始化一次,所以之后重用线程的时候,就不会进行初始化操作了,也就不会有父线程inheritableThreadLocals拷贝到子线程中的过程了,这个时候如果还想传递父线程的上下文的话,就要使用getCopyOfContextMap方法

    4.1 MDC工具类

    定义MDC工具类,支持RunnableCallable两种,目的就是为了把父线程的traceId设置给子线程

    import org.slf4j.MDC;
    import org.springframework.util.CollectionUtils;
    
    import java.util.Map;
    import java.util.concurrent.Callable;
    
    /**
     * @Description 封装MDC用于向线程池传递
     */
    public class MDCUtil {
        public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
            return () -> {
                if (CollectionUtils.isEmpty(context)) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    return callable.call();
                } finally {//清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                    MDC.clear();
                }
            };
        }
    
     public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
            return () -> {
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                setTraceIdIfAbsent();
                try {
                    runnable.run();
                } finally {
                    MDC.clear();
                }
            };
        }
    
        public static void setMDCContextMap(final Map<String, String> context) {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
        }
    
    }
    
    

    4.2 Spring ThreadPoolTaskExecutor线程池使用

    配置线程池

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolMdcWrapper();
        //核心线程数,默认为1
        taskExecutor.setCorePoolSize(1);
        //最大线程数,默认为Integer.MAX_VALUE
        taskExecutor.setMaxPoolSize(200);
        //队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
        taskExecutor.setQueueCapacity(2000);
        //线程池维护线程所允许的空闲时间,默认为60s
        taskExecutor.setKeepAliveSeconds(60);
        //线程池对拒绝任务(无线程可用)的处理策略
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 初始化线程池
        taskExecutor.initialize();
        return  taskExecutor;
    }
    

    继承ThreadPoolTaskExecutor

    public class ThreadPoolMdcWrapper extends ThreadPoolTaskExecutor {
    
        public ThreadPoolMdcWrapper() {
    
        }
    
        @Override
        public void execute(Runnable task) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    
        @Override
        public void execute(Runnable task, long startTimeout) {
            super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    
        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    
        @Override
        public ListenableFuture<?> submitListenable(Runnable task) {
            return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    
        @Override
        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
            return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
        }
    }
    
    

    4.2 使用ExecutorCompletionService方式

    当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。

    ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。

    ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。

    ExecutorCompletionService 相比之前 Future 相比 ,提供了一个通知机制,将结果统一到一个队列,当前提交任务不会阻塞获取,从另一个队列中阻塞获取。

    /**
     * 使用MDC传递traceId
     */
    public class Demo {
    
        @Autowired
        private ThreadPoolExecutor threadPoolExecutor;
    
        public void demo() {
            ExecutorCompletionService ecs = new ExecutorCompletionService(threadPoolExecutor);
            ecs.submit(MDCUtil.wrap(new TestMDC(), MDC.getCopyOfContextMap()));
        }
        
        class TestMDC implements Callable {
            @Override
            public Object call() throws Exception {
                // TODO 代码逻辑
                return null;
            }
        }
    }
    

    2.3.3 使用CompletableFuture方式

    /**
     * 使用MDC传递traceId
     */
    public class Demo {
    
        @Autowired
        private ThreadPoolExecutor threadPoolExecutor;
    
        private CompletableFuture<Result> test() {
        
            Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
            
            return CompletableFuture.supplyAsync(() -> {
            
                // 必须在打印日志前设置
                MDCUtil.setMDCContextMap(copyOfContextMap);
                //MDC.put("subTraceId",''); //如果需要对子线程进行加线程跟踪号,可在此处设定
                // TODO 业务逻辑
                return new Result();
                
            }, threadPoolExecutor).exceptionally(new Function<Throwable, Result>() {
                /**捕捉异常,不会导致整个流程中断**/
                @Override
                public Result apply(Throwable throwable) {
                    log.error("线程[{}]发生了异常[{}], 继续执行其他线程", Thread.currentThread().getName(), throwable.getMessage());
                    return null;
                }
            });
        }
    }
    

    相关文章

      网友评论

        本文标题:分布式traceId

        本文链接:https://www.haomeiwen.com/subject/oujznrtx.html