基于Dubbo的分布式链路追踪

作者: ZMMWMY | 来源:发表于2018-04-10 11:13 被阅读233次

    源码cloud分支
    假设已经了解了Dubbo的Filter,Spring Cloud Sleuth,ZipKin。
    其实openZipKin已经提供了该功能,请切换到dubbo分支
    但它是基于最新的spring cloud sleuth。spring cloud sleuth 2.0.0,我也没去了解最新的版本的代码,因为公司版本不是想升就升,所以造了一个轮子,其思想和源码差不多。
    另外使用spring cloud sleuth来链路追踪而不使用其他库,比如Brave。那就是它集成了将链路信息记录到LogBack的功能。
    基于Brave的Demo 请看源码master分支
    本文直接贴代码,有空会进行讲解


    • 定义Filter
    @Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
    public class MyTracingFilter implements Filter {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyTracingFilter.class);
    
        protected void addRequestTags(RpcContext context) {
            this.httpTraceKeysInjector.addRequestTags(context.getUrl().getAddress(),
                    context.getUrl().getHost(),
                    context.getUrl().getPath(),
                    context.getMethodName(),
                    Collections.emptyMap());
        }
    
        HttpTraceKeysInjector httpTraceKeysInjector;
        Tracer tracer;
        DubboExtractor dubboExtractor;
        DubboInject injector;
        SpanReporter spanReporter;
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            tracer = ApplicationBeanHolder.getBean(Tracer.class);
            if (tracer == null) {
                return invoker.invoke(invocation);
            } else {
                injectBean();
            }
    
    
            RpcContext rpcContext = RpcContext.getContext();
            Span span;
            String service = invoker.getInterface().getSimpleName();
            String method = RpcUtils.getMethodName(invocation);
            String spanName = service + "/" + method;
            if (rpcContext.isConsumerSide()) {
    
                span = tracer.createSpan(spanName);
                injector.inject(span, new DubboRequestTextMap(RpcContext.getContext()));
                addRequestTags(RpcContext.getContext());
                span.logEvent(Span.CLIENT_SEND);
                Result result;
                try {
                    result =  invoker.invoke(invocation);
                } finally {
                    closeSpan(span,true);
                }
                return result;
            } else {
                Span parentSpan = dubboExtractor.joinTrace(new DubboRequestTextMap(RpcContext.getContext()));
                if (parentSpan != null) {
                    span = parentSpan;
                    tracer.continueSpan(span);
                    span.logEvent(Span.SERVER_RECV);
                } else {
                    String header = RpcContext.getContext().getAttachment(Span.SPAN_FLAGS);
                    if (Span.SPAN_SAMPLED.equals(header)) {
                        span = tracer.createSpan(spanName, new AlwaysSampler());
                    } else {
                        span = tracer.createSpan(spanName);
                    }
                    span.logEvent(Span.SERVER_RECV);
                }
                Result result;
                try {
                    result =  invoker.invoke(invocation);
                } finally {
                    recordParentSpan(span);
                    closeSpan(span,false);
                }
                return result;
            }
    
        }
    
        private void recordParentSpan(Span parent) {
            if (parent == null) {
                return;
            }
            if (parent.isRemote()) {
                tracer.getCurrentSpan().logEvent(Span.SERVER_SEND);
                parent.stop();
                this.spanReporter.report(parent);
            }
        }
    
        private void injectBean() {
            injector = ApplicationBeanHolder.getBean(DubboInject.class);
            dubboExtractor = ApplicationBeanHolder.getBean(DubboExtractor.class);
            httpTraceKeysInjector = ApplicationBeanHolder.getBean(HttpTraceKeysInjector.class);
            spanReporter = ApplicationBeanHolder.getBean(SpanReporter.class);
        }
    
        private void closeSpan(Span span, Boolean type) {
            if (type) {
                tracer.getCurrentSpan().logEvent(Span.CLIENT_RECV);
            }
            if (span != null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Closing Dubbo span " + span);
                }
                tracer.close(span);
            }
        }
    }
    
    • DubboInject
    public class DubboInject implements SpanInjector<SpanTextMap> {
    
        private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();
    
        @Override
        public void inject(Span span, SpanTextMap map) {
            Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(map);
            setHeader(map, carrier, Span.TRACE_ID_NAME, span.traceIdString());
            setIdHeader(map, carrier, Span.SPAN_ID_NAME, span.getSpanId());
            setHeader(map, carrier, Span.SAMPLED_NAME, span.isExportable() ? Span.SPAN_SAMPLED : Span.SPAN_NOT_SAMPLED);
            setHeader(map, carrier, Span.SPAN_NAME_NAME, span.getName());
            setIdHeader(map, carrier, Span.PARENT_ID_NAME, getParentId(span));
            setHeader(map, carrier, Span.PROCESS_ID_NAME, span.getProcessId());
            for (Map.Entry<String, String> entry : span.baggageItems()) {
                map.put(prefixedKey(entry.getKey()), entry.getValue());
            }
        }
    
        private String prefixedKey(String key) {
            if (key.startsWith(Span.SPAN_BAGGAGE_HEADER_PREFIX
                    + DubboSpanMapper.HEADER_DELIMITER)) {
                return key;
            }
            return Span.SPAN_BAGGAGE_HEADER_PREFIX + DubboSpanMapper.HEADER_DELIMITER
                    + key;
        }
    
        private Long getParentId(Span span) {
            return !span.getParents().isEmpty() ? span.getParents().get(0) : null;
        }
    
        private void setIdHeader(SpanTextMap map, Map<String, String> carrier, String name, Long value) {
            if (value != null) {
                setHeader(map, carrier, name, Span.idToHex(value));
            }
        }
    
        private void setHeader(SpanTextMap map, Map<String, String> carrier, String name, String value) {
            if (StringUtils.hasText(value) && !carrier.containsKey(name)) {
                map.put(name, value);
            }
        }
    }
    
    • DubboExtractor
    public class DubboExtractor implements SpanExtractor<SpanTextMap> {
    
        private static final org.apache.commons.logging.Log log = LogFactory.getLog(
                MethodHandles.lookup().lookupClass());
    
        private static final String HTTP_COMPONENT = "http";
    
        private static final DubboSpanMapper SPAN_CARRIER_MAPPER = new DubboSpanMapper();
    
        private final Pattern skipPattern;
        private final Random random;
    
        public DubboExtractor(Pattern skipPattern) {
            this.skipPattern = skipPattern;
            this.random = new Random();
        }
    
        @Override
        public Span joinTrace(SpanTextMap textMap) {
            Map<String, String> carrier = SPAN_CARRIER_MAPPER.convert(textMap);
            boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
            boolean idToBeGenerated = debug && onlySpanIdIsPresent(carrier);
    
            if (!idToBeGenerated && traceIdIsMissing(carrier)) {
                return null;
            }
            try {
                return buildParentSpan(carrier, idToBeGenerated);
            } catch (Exception e) {
                log.error("Exception occurred while trying to extract span from carrier", e);
                return null;
            }
        }
    
        private boolean onlySpanIdIsPresent(Map<String, String> carrier) {
            return traceIdIsMissing(carrier) && spanIdIsPresent(carrier);
        }
    
        private boolean traceIdIsMissing(Map<String, String> carrier) {
            return carrier.get(Span.TRACE_ID_NAME) == null;
        }
    
        private boolean spanIdIsPresent(Map<String, String> carrier) {
            return carrier.get(Span.SPAN_ID_NAME) != null;
        }
    
        private String generateId() {
            return Span.idToHex(this.random.nextLong());
        }
    
        private long spanId(String spanId, String traceId) {
            if (spanId == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Request is missing a span id but it has a trace id. We'll assume that this is "
                            + "a root span with span id equal to the lower 64-bits of the trace id");
                }
                return Span.hexToId(traceId);
            } else {
                return Span.hexToId(spanId);
            }
        }
    
        private Span buildParentSpan(Map<String, String> carrier, boolean idToBeGenerated) {
            String traceId = carrier.get(Span.TRACE_ID_NAME);
            if (traceId == null) {
                traceId = generateId();
            }
            Span.SpanBuilder span = Span.builder()
                    .traceIdHigh(traceId.length() == 32 ? Span.hexToId(traceId, 0) : 0)
                    .traceId(Span.hexToId(traceId))
                    .spanId(spanId(carrier.get(Span.SPAN_ID_NAME), traceId));
            String parentName = carrier.get(Span.SPAN_NAME_NAME);
            if (StringUtils.hasText(parentName)) {
                span.name(parentName);
            } else {
                span.name(HTTP_COMPONENT + ":/parent"
                        + carrier.get(DubboSpanMapper.URI_HEADER));
            }
            String processId = carrier.get(Span.PROCESS_ID_NAME);
            if (StringUtils.hasText(processId)) {
                span.processId(processId);
            }
            String parentId = carrier.get(Span.PARENT_ID_NAME);
            if (parentId != null) {
                span.parent(Span.hexToId(parentId));
            }
            span.remote(true);
    
    //        boolean skip = this.skipPattern
    //                .matcher(carrier.get(DubboSpanMapper.URI_HEADER)).matches()
    //                || Span.SPAN_NOT_SAMPLED.equals(carrier.get(Span.SAMPLED_NAME));
            boolean skip = false;
            // trace, span id were retrieved from the headers and span is sampled
            span.exportable(!(skip || idToBeGenerated));
            boolean debug = Span.SPAN_SAMPLED.equals(carrier.get(Span.SPAN_FLAGS));
            if (debug) {
                span.exportable(true);
            } else if (skip) {
                span.exportable(false);
            }
            for (Map.Entry<String, String> entry : carrier.entrySet()) {
                if (entry.getKey().toLowerCase()
                        .startsWith(DubboSpanMapper.BAGGAGE_PREFIX)) {
                    span.baggage(unprefixedKey(entry.getKey()), entry.getValue());
                }
            }
            return span.build();
        }
    
        private String unprefixedKey(String key) {
            return key.substring(key.indexOf(DubboSpanMapper.HEADER_DELIMITER) + 1)
                    .toLowerCase();
        }
    }
    
    
    • DubboRequestTextMap
    public class DubboRequestTextMap implements SpanTextMap {
    
        private final RpcContext delegate;
    
        public DubboRequestTextMap(RpcContext delegate) {
            this.delegate = delegate;
        }
    
        @Override
        public Iterator<Map.Entry<String, String>> iterator() {
            return this.delegate.getAttachments().entrySet().iterator();
        }
    
        @Override
        public void put(String key, String value) {
            if (!StringUtils.hasText(value)) {
                return;
            }
            this.delegate.getAttachments().put(key, value);
        }
    }
    
    • DubboSpanMapper
    public class DubboSpanMapper {
    
        static final String HEADER_DELIMITER = "-";
        static final String BAGGAGE_PREFIX = Span.SPAN_BAGGAGE_HEADER_PREFIX
                + HEADER_DELIMITER;
        static final String URI_HEADER = "X-Span-Uri";
    
        private static Comparator<String> IGNORE_CASE_COMPARATOR = new Comparator<String>() {
            @Override
            public int compare(String o1, String o2) {
                return o1.toLowerCase().compareTo(o2.toLowerCase());
            }
        };
    
        /**
         * Acceptable span fields
         */
        private static final Set<String> SPAN_FIELDS;
    
        static {
            TreeSet<String> fields = new TreeSet<>(IGNORE_CASE_COMPARATOR);
            Collections.addAll(fields, Span.SPAN_FLAGS, Span.TRACE_ID_NAME, Span.SPAN_ID_NAME,
                    Span.PROCESS_ID_NAME, Span.SPAN_NAME_NAME, Span.PARENT_ID_NAME,
                    Span.SAMPLED_NAME, URI_HEADER);
            SPAN_FIELDS = Collections.unmodifiableSet(fields);
        }
    
        /**
         * Create new Map of carrier values
         */
        Map<String, String> convert(SpanTextMap textMap) {
            Map<String, String> carrier = new TreeMap<>(IGNORE_CASE_COMPARATOR);
            for (Map.Entry<String, String> entry : textMap) {
                if (isAcceptable(entry.getKey())) {
                    carrier.put(entry.getKey(), entry.getValue());
                }
            }
            return Collections.unmodifiableMap(carrier);
        }
    
        private boolean isAcceptable(String key) {
            return SPAN_FIELDS.contains(key) || key.startsWith(BAGGAGE_PREFIX);
        }
    }
    

    相关文章

      网友评论

        本文标题:基于Dubbo的分布式链路追踪

        本文链接:https://www.haomeiwen.com/subject/ihpghftx.html