美文网首页
2021-07-13 基于aws链路追踪方案

2021-07-13 基于aws链路追踪方案

作者: 江江江123 | 来源:发表于2021-07-13 12:09 被阅读0次

    链路追踪方案

    背景

    目前日志使用aop+logback生成。
    存在问题
    1.高并发下日志大量生成以至于无法确认哪条日志对应哪个用户的操作;
    2.日志生成过于零散缺乏统一的生成格式;
    所以目前尝试引入链路追踪的方式来定位异常。

    方案选择

    服务分类 cat zipkin pinpoint skywalking aws cloudwatch
    依赖 ·Java 6,7,8
    ·Maven 3.2.3
    ·mysql5.6
    ·Linux 2.6以及之上(2.6内核才可以支持epoll)
    ·Java 6,7,8
    ·Maven3.2+
    ·rabbitMQ
    ·Java 6,7,8
    ·maven3+
    ·Hbase0.94+
    ·Java 6,7,8
    ·maven3.0+
    ·nodejs
    ·zookeeper
    ·elasticsearch
    ·Java 6,7,8
    ·maven3.0+
    ·aws
    实现方式 代码埋点(拦截器,注解,过滤器等) 拦截请求,发送(HTTP,mq)数据至zipkin服务 java探针,字节码增强 java探针,字节码增强 java aop,fluent-bit传输
    存储选择 mysql , hdfs in-memory , mysql , Cassandra , Elasticsearch HBase elasticsearch , H2 cloudwatch
    通信方式 http , MQ thrift GRPC
    MQ监控 不支持 不支持 不支持 支持 不支持
    全局调用统计 支持 不支持 支持 支持 支持
    trace查询 不支持 支持 不支持 支持 支持
    报警 支持 不支持 支持 支持 支持
    JVM监控 不支持 不支持 支持 支持 不支持
    优点 功能完善 spring-cloud-sleuth可以很好的集成zipkin , 代码无侵入,集成非常简单 , 社区更加活跃。对外提供有query接口,更加容易二次开发 完全无侵入, 仅需修改启动方式,界面完善,功能细致。 全无侵入,界面完善,支持应用拓扑图及单个调用链查询。功能比较完善(zipkin + pinpoint) 可定制性强,可根据需求自定义任何功能
    缺点 ·代码侵入性较强,需要埋点
    ·文档比较混乱,文档与发布版本的符合性较低,需要依赖点评私服 (或者需要把他私服上的jar手动下载下来,然后上传到我们的私服上去)。
    ·默认使用的是http请求向zipkin上报信息,耗性能。
    ·跟sleuth结合可以使用rabbitMQ的方式异步来做,增加了复杂度,需要引入rabbitMQ 。·数据分析比较简单。
    ·不支持查询单个调用链, 对外表现的是整个应用的调用生态。
    ·二次开发难度较高
    ·版本之前BUG较多 ,网上反映兼容性较差
    ·3.2新版本的反映情况较少依赖较多。
    ·依赖于aws
    ·有一定学习成本
    文档 网上资料较少,仅官网提供的文档,比较乱 文档完善 文档完善 文档完善 文档完善,但过于老旧,查询不方便
    使用公司 大众点评, 携程, 陆金所,同程旅游,猎聘网 twitter naver 华为软件开发云、天源迪科、当当网、京东金融

    我们当前服务是单体服务,为此引入第三方过于复杂。
    其次当前服务的监控依赖于aws的cloudwatch,目前通过fluent-bit将pods中的日志上传处理,暂时不希望引入额外的监控系统
    最终决定通过拦截器+修改日志生成方案来解决。

    详细设计

    1. aop日志生成
      之前的日志生成过于零散,无法通过cloudwatch有效监控,报警,所以本次引入request 与 error request 类,通过 aop环绕方法统一打印日志
           @Data
           public class RequestInfo {
               private String ip;
               private String url;
               private String httpMethod;
               private String classMethod;
               private Object requestParams;
               private Object result;
               private Long timeCost;
           }
       
           @Data
           public class RequestErrorInfo {
               private String ip;
               private String url;
               private String httpMethod;
               private String classMethod;
               private Object requestParams;
               private RuntimeException exception;
           }
      
      aop环绕方法,异常方法实现
           @Around("webLogPointcut()")
           public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
               long start = System.currentTimeMillis();
               ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
               HttpServletRequest request = attributes.getRequest();
               Object result = proceedingJoinPoint.proceed();
               RequestInfo requestInfo = new RequestInfo();
               requestInfo.setIp(request.getRemoteAddr());
               requestInfo.setUrl(request.getRequestURL().toString());
               requestInfo.setHttpMethod(request.getMethod());
               requestInfo.setClassMethod(String.format("%s.%s", proceedingJoinPoint.getSignature().getDeclaringTypeName(),
                       proceedingJoinPoint.getSignature().getName()));
               requestInfo.setRequestParams(getRequestParamsByProceedingJoinPoint(proceedingJoinPoint));
               requestInfo.setResult(result);
               requestInfo.setTimeCost(System.currentTimeMillis() - start);
               log.info("Request Info: {}", JSON.toJSONString(requestInfo));
               return result;
           }
      
           /**
            * 异常通知:
            * 1. 在目标方法非正常结束,发生异常或者抛出异常时执行
            * 1. 在异常通知中设置异常信息,并将其保存
            *
            * @param throwable
            */
           @AfterThrowing(value = "webLogPointcut()", throwing = "throwable")
           public void doAfterThrowing(JoinPoint joinPoint, RuntimeException throwable) {
               // 保存异常日志记录
               ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
               HttpServletRequest request = attributes.getRequest();
               RequestErrorInfo requestErrorInfo = new RequestErrorInfo();
               requestErrorInfo.setIp(request.getRemoteAddr());
               requestErrorInfo.setUrl(request.getRequestURL().toString());
               requestErrorInfo.setHttpMethod(request.getMethod());
               requestErrorInfo.setClassMethod(String.format("%s.%s", joinPoint.getSignature().getDeclaringTypeName(),
                       joinPoint.getSignature().getName()));
               requestErrorInfo.setRequestParams(getRequestParamsByJoinPoint(joinPoint));
               requestErrorInfo.setException(throwable);
               log.error("Error Request Info: {}", JSON.toJSONString(requestErrorInfo));
           }
           
          private Map<String, Object> getRequestParamsByProceedingJoinPoint(ProceedingJoinPoint proceedingJoinPoint) {
               //参数名
               String[] paramNames = ((MethodSignature) proceedingJoinPoint.getSignature()).getParameterNames();
               //参数值
               Object[] paramValues = proceedingJoinPoint.getArgs();
       
               return buildRequestParam(paramNames, paramValues);
           }
       
           private Map<String, Object> getRequestParamsByJoinPoint(JoinPoint joinPoint) {
               //参数名
               String[] paramNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
               //参数值
               Object[] paramValues = joinPoint.getArgs();
       
               return buildRequestParam(paramNames, paramValues);
           }
       
           private Map<String, Object> buildRequestParam(String[] paramNames, Object[] paramValues) {
               Map<String, Object> requestParams = new HashMap<>();
               for (int i = 0; i < paramNames.length; i++) {
                   Object value = paramValues[i];
       
                   //如果是文件对象
                   if (value instanceof MultipartFile) {
                       MultipartFile file = (MultipartFile) value;
                       value = file.getOriginalFilename();  //获取文件名
                   }
       
                   requestParams.put(paramNames[i], value);
               }
       
               return requestParams;
           }
      
    2. 添加traceId (存在问题 已在下方修复)
      通过ThreadContext存储traceId,traceId来源可以是前端,如果前端不包含则通过雪花算法生成一个id 2.1 添加traceId拦截器
        @Component
        public class LogInterceptor implements HandlerInterceptor {
           private final static String TRACE_ID = "traceId";
       
           @Override
           public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
               String traceId = request.getHeader("traceId");
               if (StringUtils.isEmpty(traceId)) {
                   //traceId = java.util.UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
                   traceId = SnowflakeIdUtils.next().toString();
               }
               ThreadContext.put("traceId", traceId);
               return true;
           }
       
           @Override
           public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView)
                   throws Exception {
           }
       
           @Override
           public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
                   throws Exception {
               ThreadContext.remove(TRACE_ID);
           }
        }
      
      2.2 项目中引入该拦截器
       @Configuration
       public class WebConfig implements WebMvcConfigurer {
           @Autowired
           LogInterceptor logInterceptor;
       
           @Override
           public void addInterceptors(InterceptorRegistry registry) {
               registry.addInterceptor(logInterceptor);
           }
       }
      
    3. 编辑logback 日志生成方式
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
           <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
               <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
               <pattern>[TRACEID:%X{traceId}] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
           </encoder>
       </appender>
      
    4. aws eks fluent-bit cloudwatch 搭建监控,报警平台
      通过cloudwatch 平台统一业务日志,并删选异常日志,日志提供所属容器及traceId等内容
       filter @logStream like 'test' //test 表示模糊查询项目
       | fields kubernetes.pod_name  //显示日志所属容器
       | parse  log "[TRACEID:*] * [*] * * - *" as @traceId,@time, @thread,@level,@source,@msg //解析logback日志
       | filter @msg like "Request"  //筛选request and error request 日志
       | parse @msg "Request Info: {\"classMethod\":\"*\",\"httpMethod\":\"*\",\"ip\":\"*\",\"requestParams\":*,\"result\":*,\"timeCost\":*,\"url\":\"*\"" as classMethod,httpMethod,ip,requestParams,result,timeCost,url //解析request log
       | parse result "{\"code\":\"*\",*}" as  resultCode ,other //解析result code 查询业务异常
      

    备注

    一.存在问题

    使用该方案生成日志对controller层的参数有限制,不能引入request,response等参数,如果一定需要使用,可以通过spring容器内读取。

    二.在使用这套日志体系后发现新的问题,由于项目中引入了基于netty的websocket,不受aop的日志记录影响,所以参考以上方案简单实现了类似的功能。 如:
    1. 在@BeforeHandshake中引入traceId;
    2. 创建日志类
        @Data
           public class SocketRequestInfo {
               private String uid;
               private String roomId;
               private Integer code;
               private Long timeCost;
           }
       
           @Data
           public class SocketRequestErrorInfo {
               private String uid;
               private String roomId;
               private RuntimeException exception;
           }
    
    1. 在@OnBinary,@OnError 中分别打印日志

    参考

    1. 分布式链路追踪技术对比
    2. 用好 Spring AOP,天降大锅从容应对!
    3. Set up Fluent Bit as a DaemonSet to send logs to CloudWatch Logs PDF
    4. CloudWatch Logs Insights query syntax

    2022-04-14 更新

    tarceId: 当traceId使用ThreadLocal时,在rpc调用出现一个问题。
    环境:k8s springboot okhttp retrofit2
    当使用retrofit2 调用下游业务时发现thraceId丢失。
    之前因为业务紧,选择了一个非常low的方案,在所有api接口上添加@Header Sring traceId
    今天在在学习新知识时偶然发现更优雅的解决方案,特此记录。

    新增traceUtils

    核心: 用MDC替换ThreadLcoal

    public class TraceUtils {
        public final static String TRACE_ID = "traceId";
    
        public static String getTraceId() {
            return MDC.get(TRACE_ID);
        }
    
        public static void addTraceId(String traceId) {
            if (StringUtils.isEmpty(traceId)) {
                MDC.put(TRACE_ID, SnowflakeIdUtils.next().toString());
            } else {
                MDC.put(TRACE_ID, traceId);
            }
    
        }
    
        public static void clear() {
            MDC.clear();
        }
    }
    

    修改LogInterceptor

    @Component
    public class LogInterceptor implements HandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            String traceId = request.getHeader(TraceUtils.TRACE_ID);
            TraceUtils.addTraceId(traceId);
            return true;
        }
    
        @Override
        public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView)
                throws Exception {
        }
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
                throws Exception {
            TraceUtils.clear();
        }
    }
    

    新增OkHttpInterceptor

    public class OkHttpTraceIdInterceptor implements Interceptor {
        @Override
        public Response intercept(Chain chain) throws IOException {
            String traceId = TraceUtils.getTraceId();
            Request request = null;
            if (traceId != null) {
                //添加请求体
                request = chain.request().newBuilder().addHeader(TraceUtils.TRACE_ID, traceId).build();
            }
            Response originResponse = chain.proceed(request);
            return originResponse;
        }
    }
    

    修改okHttpUtils

    public class OkHttpUtils {
        private final static OkHttpClient okHttpClient;
    
        static {
            okHttpClient = new OkHttpClient.Builder()
                    //.sslSocketFactory(sslSocketFactory(), x509TrustManager())
                    .retryOnConnectionFailure(true)
                    .connectionPool(pool())
                    .connectTimeout(60, TimeUnit.SECONDS)
                    .readTimeout(60, TimeUnit.SECONDS)
                    .writeTimeout(60, TimeUnit.SECONDS)
                    .addInterceptor(new OkHttpTraceIdInterceptor())
                    .build();
        }
        public static OkHttpClient getOkHttpClient() {
            return okHttpClient;
        }
        public static ConnectionPool pool() {
            return new ConnectionPool(200, 5, TimeUnit.MINUTES);
        }
    }
    

    当使用MDH代替ThreadLocal ,终于将大面积接口调用的TraceId参数去掉了。

    相关文章

      网友评论

          本文标题:2021-07-13 基于aws链路追踪方案

          本文链接:https://www.haomeiwen.com/subject/yfajpltx.html