美文网首页
如何复制Pinpoint中一条调用链的完整数据

如何复制Pinpoint中一条调用链的完整数据

作者: Tankilo | 来源:发表于2020-03-28 19:48 被阅读0次

    如何复制Pinpoint中一条调用链的完整数据

    如果你熟悉Pinpoint的话,你应该知道一条调用链包含哪些数据
    在这里我指的是com.navercorp.pinpoint.web.controller.BusinessTransactionController#transactionInfo方法中查询会涉及到的HBase数据。
    为什么会产生这个需求呢,HBase中的数据都是配置了TTL的,过一段时间会被清理,你可能就和要这条骨骼惊奇的调用链说拜拜了。
    如果能够离线或者保存另外的HBase里,可以更快的复现场景进行调试和排查。


    image.png

    先看一下到底会查哪些表吧。

    • TraceV2
    • ApiMetaData
    • StringMetaData
    • SqlMetaData_Ver2

    Pinpoint在构造调用链界面需要的信息的时候,TraceV2是一行数据,用事务号就能查询出来。
    然后遍历这行数据里的Span和SpanEvent,使用包含的apiId,stringId,sqlId去后三个表对应查询所关联的数据。
    后面三个,一个复杂的调用链会查询很多次。怎么才能知道呢?

    下面只是记录一下本地试验的方法,仅在测试环境中使用。

    利用Spring AOP 将hbase查询结果 插入到另外的hbase中

    我想了下,如果在hbase查询的时候进行AOP拦截,并且把数据发送到另外一个hbase的话,这样不就能把一条调用链的数据给剥离出来了么?
    我把将查询出来的数据插入到别的hbase的过程叫做逆转

    我们要寻找一些合适的spring bean,因为spring的aop只能作用在spring创建的对象上。

    首先我注意到 com.navercorp.pinpoint.common.hbase.RowMapper#mapRow

    public interface RowMapper<T> {
    
        T mapRow(Result result, int rowNum) throws Exception;
    }
    

    第一个参数org.apache.hadoop.hbase.client.Result包含了查询出来的一行数据,现在就差个表名了。

    仔细看pinpoint的代码,每次执行hbase操作前都会调用com.navercorp.pinpoint.common.hbase.TableDescriptor#getTableName获取表名。
    这样设置一个线程上下文(https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java),就能将表名和多行数据完整联系在一起了。

    最后线程上下文里面还需要设置一个是否逆转查询数据的标志。
    对于查询单条调用链来说就是com.navercorp.pinpoint.web.service.SpanService#selectSpan方法进入的时候开启标志。

    实现

    首先仿照shiro弄个线程上下文。

    package com.navercorp.pinpoint.web.dao;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author tankilo
     * https://github.com/apache/shiro/blob/master/core/src/main/java/org/apache/shiro/util/ThreadContext.java
     */
    public final class ThreadContext {
        private ThreadContext() {
    
        }
    
        public static final String REVERSE = "REVERSE";
        public static final String TABLE_NAME = "TABLE_NAME";
        private static ThreadLocal<Map<String, Object>> resources = new InheritableThreadLocal<Map<String, Object>>() {
            @Override
            protected Map<String, Object> initialValue() {
                return new HashMap<>(8);
            }
        };
    
        public static void put(String key, Object value) {
            if (key == null) {
                throw new IllegalArgumentException("key cannot be null");
            }
    
            if (value == null) {
                remove(key);
                return;
            }
            ensureResourcesInitialized();
            resources.get().put(key, value);
        }
    
        private static void ensureResourcesInitialized() {
            if (resources.get() == null) {
                resources.set(new HashMap<>(8));
            }
        }
    
        public static void remove(String key) {
            Map<String, Object> map = resources.get();
            if (map != null) {
                map.remove(key);
            }
        }
    
        public static Object get(String key) {
            Map<String, Object> map = resources.get();
            if (map != null) {
                return map.get(key);
            } else {
                return null;
            }
        }
    
        private static Object getValue(Object key) {
            Map<String, Object> perThreadResources = resources.get();
            return perThreadResources != null ? perThreadResources.get(key) : null;
        }
    
        private static Boolean getBoolean(String key, Boolean defaultValue) {
            Object value = getValue(key);
            if (null != value) {
                return Boolean.valueOf(value.toString());
            }
            return defaultValue;
        }
    
        private static String getString(String key) {
            Object value = getValue(key);
            if (null != value) {
                return value.toString();
            } else {
                return null;
            }
        }
    
        public static void setReverse(boolean reverse) {
            put(REVERSE, reverse);
    
        }
    
        public static boolean isReverse() {
            return getBoolean(REVERSE, false);
        }
    
        public static void setTableName(String tableName) {
            put(TABLE_NAME, tableName);
        }
    
        public static String getTableName() {
            return getString(TABLE_NAME);
        }
    
        public static void remove() {
            resources.remove();
        }
    }
    

    切面

    @Aspect
    public class HbaseTemplateReverseAspect {
    
        @Autowired
        @Qualifier("hbaseTemplateReverse")
        private HbaseOperations2 template2;
    
        @Autowired
        private HbaseTableNameProvider tableNameProvider;
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Pointcut("execution(public * com.navercorp.pinpoint.common.hbase.RowMapper.mapRow(..))")
        public void pointCut() {
        }
    
        @After("pointCut() && args(result,rowNum)")
        public void doBefore(Result result, int rowNum) {
            if (ThreadContext.isReverse()) {
                String tableNameStr = ThreadContext.getTableName();
                TableName tableName = tableNameProvider.getTableName(tableNameStr);
                Put put = new Put(result.getRow());
                Cell[] rawCells = result.rawCells();
                for (Cell cell : rawCells) {
                    put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp(), CellUtil.cloneValue(cell));
                }
                template2.asyncPut(tableName, put);
            }
        }
    
        @Before("execution(public * com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2.selectSpan(..))")
        public void selectSpan() {
            ThreadContext.setTableName(HbaseTable.TRACE_V2.getName());
        }
    
        @AfterReturning(returning = "tableName", pointcut = "execution(public * com.navercorp.pinpoint.common.hbase.TableDescriptor.getTableName())")
        public void getTableName(TableName tableName) {
            ThreadContext.setTableName(tableName.getNameAsString());
        }
    
        @Around("execution(public * com.navercorp.pinpoint.web.service.SpanService.selectSpan(..))")
        public Object dealContext(ProceedingJoinPoint jp) throws Throwable {
            Object result;
            try {
                ThreadContext.setReverse(true);
                result = jp.proceed();
            } finally {
                ThreadContext.remove();
            }
            return result;
        }
    }
    

    遗憾

    里面有些曲折,看上面代码也知道。
    com.navercorp.pinpoint.web.dao.hbase.HbaseTraceDaoV2#spanMapperV2 不是spring bean,而是手动构造的。
    这里我就没有继续弄下去了,因为感觉spring aop的局限性很大,计划用java instrumentation api,和pinpoint的agent一样重新弄下。

    代码提交备份在我的github上 https://github.com/tankilo/pinpoint/tree/spring-aop-hbase-reverse

    相关文章

      网友评论

          本文标题:如何复制Pinpoint中一条调用链的完整数据

          本文链接:https://www.haomeiwen.com/subject/xfwsuhtx.html