美文网首页Java 杂谈JavaJava架构技术进阶
深入探究ZIPKIN调用链跟踪——链路上报篇

深入探究ZIPKIN调用链跟踪——链路上报篇

作者: PioneerYi | 来源:发表于2019-08-20 10:51 被阅读7次

    前言:ZIPKIN作为当前“分布式服务链路跟踪”问题的流行解决方案之一,正在被越来越多的公司和个人学习使用。其中很重要的一块,就是上报链路数据。那么知道服务端如何接收数据,以及我们该怎样上报数据到服务端就显得十分重要。虽然ZIPKIN官方也开源了一个客户端Brave,但是本文却并不想直接介绍Brave,而是想站在一个从零开发ZIPKIN客户端的角度,一层层分析解决如何自己写一个ZIPKIN客户端,直到最后引出Brave。本文最终想达到的效果,就是希望通过本文,能让大家对ZIPKIN的链路上报有一个详尽的理解和认识。

    注:本文也发布于我的CSDN博客:深入探究ZIPKIN调用链跟踪——链路上报篇

    零行代码快速开始

    ZIPKIN是Spring Boot服务,因此启动起来十分方便,直接运行ZIPKIN JAR包就可以了。ZIPKIN JAR包我们可以自行编译ZIPKIN源码获得,也可以从下面的仓库获取,这个仓库专门存放ZIPKIN各种编译好的JAR包,仓库地址为:

    https://dl.bintray.com/openzipkin/maven/io/zipkin/java/
    

    进入zipkin-server目录下载server jar包,比如下载2.11.5版本,运行jar包:

    java -jar zipkin-server-2.11.5-exec.jar
    

    这样本地就起了一个ZIPKIN服务,浏览器中输入http://localhost:9411/zipkin/,即可打开ZIPKIN首页,效果如下:

    ZIPKIN 首页

    接下来,我们开始上报数据,现在我不想写任何代码,那么就用postman发起一次post请求上报数据吧:


    POSTMAN 上报数据

    上报后在本地ZIPKIN服务上即可看到刚上报的数据效果:


    上报数据展示

    没有任何一句代码,一个完整的ZIPKIN数据上报存储展示就完成了,那么我们来思考下:

    1、如果自己写一个上ZIPKIN客户端,该如何写?

    分析:要上传数据给服务端,那么必须要搞清楚,ZIPKIN服务端可以接受什么样格式的数据?支持的编码解码协议是什么?还有支持那些通信传输协议?

    Task: 了解了服务怎么接收数据后,客户端的task也就明确了:
    1)以ZIPKIN服务端支持的数据格式组织数据;
    2)以ZIPKIN服务端支持的编解码协议编码数据;
    3)以ZIPKIN服务端支持的传输协议上报数据。

    2、基础ZIPKIN客户端完成后,如何适配各种组件?

    描述:一个服务通常涉及多个组件,包括服务框架,消息中间件,各种数据库等,那么怎么上报这些组件的数据给服务端了,值得我们思考?

    分析:其实最简单的方法就是采用一个装饰者模式包装一下组件接口,在其中加入上报的逻辑,但是这种方法局限很大,不灵活。除此之外,我们或许我们可以利用下拦截器技术,AOP技术,以及Agent,探针等技术做到无侵入上报数据。

    服务端如何接收数据的?

    要想自己写一个ZIPKIN客户端,必须搞清楚ZIPKIN服务端是怎么接收数据的,包括数据格式协议是怎样的?编解码协议是怎样的?还有支持那么传输协议?

    ZIPKIN支持的数据格式是怎样的?

    ZIPKIN是兼容OpenTracing标准的,OpenTracing规定,每个Span包含如下状态:

    • 操作名称
    • 起始时间
    • 结束时间
    • 一组 KV 值,作为阶段的标签(Span Tags)
    • 阶段日志(Span Logs)
    • 阶段上下文(SpanContext),其中包含 Trace ID 和 Span ID
    • 引用关系(References)

    OpenTracing规范与zipkin对应关系如下:

    OpenTracing ZIPKIN
    操作名称 name
    起始时间 timestamp
    结束时间 timestamp+duration
    标签 tags
    Span上下文 traceId; id
    引用关系 parentId
    Span日志

    除此之外,ZIPKIN SAPN还增加了其他几个字端:

    字端 描述
    Kind kind Spanl类型,比如是Server还是Client
    Annotaion 表示某个时间点发生的Event,Event类型:cs:Client Send 请求;sr:Server Receive到请求;ss:Server 处理完成、并Send Response;cr:Client Receive 到响应
    Endpoint localEndpoint 描述本地服务信息,比如服务名,ip等,方便根据服务名检索链路
    Endpoint remoteEndpoint RPC调用时,描述远程服务的服务信息

    最终,包含兼容OpenTracing标准的字端,以及其本身的一些字端后,zipkin的span数据字端如下:


    ZIPKIN Span

    了解了ZIPKIN的数据字端格式后,我们再看看ZIPKIN支持的编解码协议。

    ZIPKIN支持那些编解码协议?

    Zipkin主要支持三种编解码协议,分别为JSON, PROTO3, THRIFT。

    JSON编解码如下:

    JSON_V1 {
        public Encoding encoding() {
            return Encoding.JSON;
        }
    
        public boolean decode(byte[] bytes, Collection<Span> out) {
            Span result = this.decodeOne(bytes);
            if (result == null) {
                return false;
            } else {
                out.add(result);
                return true;
            }
        }
    
        public boolean decodeList(byte[] spans, Collection<Span> out) {
            return (new V1JsonSpanReader()).readList(spans, out);
        }
    
        public Span decodeOne(byte[] span) {
            V1Span v1 = (V1Span)JsonCodec.readOne(new V1JsonSpanReader(), span);
            List<Span> out = new ArrayList(1);
            V1SpanConverter.create().convert(v1, out);
            return (Span)out.get(0);
        }
    
        public List<Span> decodeList(byte[] spans) {
            return decodeList(this, spans);
        }
    }
    
    JSON_V2 {
        public Encoding encoding() {
            return Encoding.JSON;
        }
    
        public boolean decode(byte[] span, Collection<Span> out) {
            return JsonCodec.read(new V2SpanReader(), span, out);
        }
    
        public boolean decodeList(byte[] spans, Collection<Span> out) {
            return JsonCodec.readList(new V2SpanReader(), spans, out);
        }
    
        @Nullable
        public Span decodeOne(byte[] span) {
            return (Span)JsonCodec.readOne(new V2SpanReader(), span);
        }
    
        public List<Span> decodeList(byte[] spans) {
            return decodeList(this, spans);
        }
    }
    
    

    THRIFT编解码如下:

    THRIFT {
        public Encoding encoding() {
            return Encoding.THRIFT;
        }
    
        public boolean decode(byte[] span, Collection<Span> out) {
            return ThriftCodec.read(span, out);
        }
    
        public boolean decodeList(byte[] spans, Collection<Span> out) {
            return ThriftCodec.readList(spans, out);
        }
    
        public Span decodeOne(byte[] span) {
            return ThriftCodec.readOne(span);
        }
    
        public List<Span> decodeList(byte[] spans) {
            return decodeList(this, spans);
        }
    }
    

    PROTO3编解码如下:

    PROTO3 {
        public Encoding encoding() {
            return Encoding.PROTO3;
        }
    
        public boolean decode(byte[] span, Collection<Span> out) {
            return Proto3Codec.read(span, out);
        }
    
        public boolean decodeList(byte[] spans, Collection<Span> out) {
            return Proto3Codec.readList(spans, out);
        }
    
        @Nullable
        public Span decodeOne(byte[] span) {
            return Proto3Codec.readOne(span);
        }
    
        public List<Span> decodeList(byte[] spans) {
            return decodeList(this, spans);
        }
    }
    

    其中Json是默认支持的,也是使用起来最方便的,除此之外,还包括Thirft和Proto3可供开发者选择。

    ZIPKIN支持那些传输协议?

    Zipkin默认支持Http协议,除此之外,它还支持kafka,rabbitmq以及scribe协议:


    ZIPKIN支持的协议

    他们的初始化过程如下:


    Collector初始化g

    传输协议支持的编解码协议如下:


    传输协议支持的编解码协议情况

    其中Scribe限定了只支持Thirft协议,而HTTP、Kafka和RabbitMQ则是三种协议都支持。

    如何做到支持所有的编码解码协议了?ZIPKIN中提供了一个自动探测编解码的类SpanBytesDecoderDetector,其中核心方法如下:

    static BytesDecoder<Span> detectDecoder(byte[] bytes) {
        if (bytes[0] <= 16) { // binary format
        if (protobuf3(bytes)) return SpanBytesDecoder.PROTO3;
            return SpanBytesDecoder.THRIFT; /* the first byte is the TType, in a range 0-16 */
        } else if (bytes[0] != '[' && bytes[0] != '{') {
            throw new IllegalArgumentException("Could not detect the span format");
        }
        if (contains(bytes, ENDPOINT_FIELD_SUFFIX)) return SpanBytesDecoder.JSON_V2;
        if (contains(bytes, TAGS_FIELD)) return SpanBytesDecoder.JSON_V2;
        return SpanBytesDecoder.JSON_V1;
    }
    

    现在我们已经知道了zipkin服务接收的数据格式以及编解码协议和传输协议,那么接下来就可以写一个客户端了!

    自己写一个ZIPKIN客户端

    对于服务端如何接收数据,有了一个全面的认识后,我们就可以着手开始写一个ZIPKIN客户端了。

    那么,首先定义客户端上报的数据格式,最简单的方式就是定义一个跟ZIPKIN服务端一样数据格式的Span就可以了:

    @Setter
    @Getter
    public static class MySapn {
        private String traceId;
        private String parentId;
        private String id;
        private String name;
        private long timestamp;
        private long duration;
        private Map<String, String> tags;
        String kind;
        Endpoint localEndpoint, remoteEndpoint;
    
        public static enum Kind {
            CLIENT,
            SERVER,
            PRODUCER,
            CONSUMER
        }
    
        public static class Endpoint {
            String serviceName, ipv4, ipv6;
            byte[] ipv4Bytes, ipv6Bytes;
            int port; // zero means null
    
            public Endpoint(String serviceName) {
                this.serviceName = serviceName;
            }
        }
    }
    

    数据格式确定后,接着就编码数据,ZIPKIN支持三种编码方式,JSON、THIFT和PROTO3,为了简单方便,我们选择JSON协议编码Span数据。注意,ZIPKIN JSON字符串前后需要加括号。

    数据编码后,接着上报数据,ZIPKIN默认支持HTTP协议方式,JAVA HTTP请求包很多,我们随便选择一种,比如选择Apach的HttpClient jar包,代码如下:

    public class App {
    
        private static final String serverUrl = "http://localhost:9411/api/v2/spans";
    
        public static void main(String[] args) {
            MySapn span = new MySapn();
            span.traceId = "1ae1e4f435814744";
            span.parentId = "1ae1e4f435814744";
            span.id = "d1ab9cd2c50d13d1";
            span.kind = MySapn.Kind.SERVER.toString();
            span.name = "my client test";
            span.timestamp = 1565933251470428L;
            span.duration = 8286;
            span.localEndpoint = new MySapn.Endpoint("My client");
            Map<String, String> tags = new HashMap<>();
            tags.put("name", "pioneeryi");
            tags.put("lover", "dandan");
            span.tags = tags;
    
            doPost(serverUrl, span);
            System.out.println("Hello World!");
        }
    
        public static void doPost(String url, MySapn span) {
            try {
                HttpClient httpClient = new DefaultHttpClient();
    
                HttpPost post = new HttpPost(url);
                post.setHeader("Content-Type", "application/json");
                post.setHeader("charset", "UTF-8");
    
                String body = new Gson().toJson(span);
                body = "[" + body + "]";
                System.out.print(body);
    
                StringEntity entity = new StringEntity(body);
                post.setEntity(entity);
    
                HttpResponse httpResponse = httpClient.execute(post);
                System.out.print(httpResponse);
            } catch (Exception exception) {
                System.out.print("do post request fail");
            }
        }
    }
    

    maven pom如下:

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.6</version>
    </dependency>
    
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.5</version>
    </dependency>
    

    运行上面main方法,即可上报Span数据,打开zipkin首页:http://localhost:9411/zipkin/,搜索刚上报的Span的TraceId,展示效果如下:

    上报数据展示

    一个最简单的,采用JSON编码数据,HTTP协议上传数据的客户端我们就完成了。为了用户调用方便,我们可以将上面的代码封装为一个接口供用户使用:

    public void reportSpan(MySpan span)
    

    但是这个太简陋了,我们只支持了一种一个编解码协议,一种传输协议,开始优化:

    优化一:支持多种编解码,支持多种传输协议。这个时候就需要定一个上报接口类了,根据不同传输协议提供不同实现。编解码也是同样的,定义编码接口,根据不同编码协议提供不同实现。

    此外,当前这个客户端是同步上报的,性能很差,因此必须改成异步上报,接着优化:

    优化二:上报改成异步上报,队列+线程。

    有了这两步优化后,client大体框架就初步成型了,写好了客户端后,怎么适配各个组件,做到无侵入上报也很重要,继续优化:

    优化三:适配各个组件,比如Spring Boot,Kafak,MySql等等。

    一个完整的Client,还是有很多工作要做的,这里咱就不继续深入优化开发了,直接看看官方的Brave怎么做的!

    探究ZIPKIN客户端Brave

    为了说明Brave的使用和上报过程,我们先写一个很简单的上报Demo,进行演示。Demo将上报一个“一父Span,两个子Span“的链路,demo如下:

    public class TraceDemo {
        public static void main(String[] args) {
            Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
            AsyncReporter asyncReporter = AsyncReporter.create(sender);
            Tracing tracing = Tracing.newBuilder()
                .localServiceName("my-service")
                .spanReporter(asyncReporter)
                .build();
            Tracer tracer = tracing.tracer();
    
            Span parentSpan = tracer.newTrace().name("parent span").start();
            
            Span childSpan1 = tracer.newChild(parentSpan.context()).name("child span1").start();
            sleep(500);
            childSpan1.finish();
    
            Span childSpan2 = tracer.newChild(parentSpan.context()).name("child span2").start();
            sleep(500);
            childSpan2.finish();
            
            parentSpan.finish();
        }
    }
    

    其中 maven pom 引入如下包:

    <dependency>
        <groupId>io.zipkin.brave</groupId>
        <artifactId>brave</artifactId>
        <version>5.3.3</version>
    </dependency>
    
    <dependency>
        <groupId>io.zipkin.reporter2</groupId>
        <artifactId>zipkin-sender-okhttp3</artifactId>
        <version>2.6.0</version>
    </dependency>
    

    启动zipkin 服务:

    java -jar zipkin-server-2.11.5-exec.jar
    

    然后在浏览器中输入:http://localhost:9411 ,即可打开zipkin首页,看到示例代码上报的效果图如下:

    上报数据展示

    一个最简单的但是却是很完整的一个ZIPKIN链路上报演示,就如上面Demo所示,那么接下来分析一下整个链路的上报过程!

    链路上报解析

    自己开发一个客户端时,我们首先会封装一个Span类,Brave也不例外,它也定义了Span数据结构;那么定义好Span后,谁来负责构造Span了?

    Brave中定义了一个类叫Tracer来完成构造Span的工作;Brave生成好了Span后,此时需要编码发送了,那么谁又来发送了?

    Brave定义了Reporter组件,它支持异步发送以及多传输协议以及多编码协议发送Span数据。

    看起来,各个组件已经完备了,组件有点多!此时需要那么一个人将这些组件组织起来,同时与服务端取得联系,开始打通整个流程了,这就是Tracing类的功能。

    接下来详细讲解一下各个组件的功能,并根据Demo代码,将各个组件串起来,最终梳理清楚Brave上报的流程和原理。

    Brave Span

    Brave中Span相关类有:SpanCustomizer、NoopSpanCustomizer、CurrentSpanCustomizer、RealSpanCustomizer、Span、NoopSpan、RealSpan。

    示例代码中,关于span的操作如下:

    Span span = tracer.newTrace().name("encode").start();
    try {
      doSomethingExpensive();
    } finally {
      span.finish();
    }
    

    首先通过tracer生成一个span,最后,调用span.finish()上报,接下来就来看看span,以及这个finish干了什么。

    咱们用的Span的实现子类为RealSpan,该类依赖几个核心类:

    RealSpan两个核心方法start, finish:

    @Override 
    public Span start() {
        return start(clock.currentTimeMicroseconds());
    }
    
    @Override 
    public Span start(long timestamp) {
        synchronized (state) {
            state.startTimestamp(timestamp);
        }
        return this;
    }
    

    start方法主要记录开始时间,接下来看看finish 方法:

    @Override 
    public void finish(long timestamp) {
        if (!pendingSpans.remove(context)) return;
        synchronized (state) {
            state.finishTimestamp(timestamp);
        }
        finishedSpanHandler.handle(context, state);
    }
    

    这里交给FinishedSpanHandler来处理。FinishedSpanHandler是一个抽象类,他有如下子类实现:

    • ZipkinFinishedSpanHandler
    • MetricsFinishedSpanHandler
    • NoopAwareFinishedSpan
    • CompositeFinishedSpanHandler

    我们主要关注ZipkinFinishedSpanHandler实现:

    public final class ZipkinFinishedSpanHandler extends FinishedSpanHandler {
        final Reporter<zipkin2.Span> spanReporter;
        final MutableSpanConverter converter;
    
        public ZipkinFinishedSpanHandler(Reporter<zipkin2.Span> spanReporter,
        ErrorParser errorParser, String serviceName, String ip, int port) {
            this.spanReporter = spanReporter;
            this.converter = new MutableSpanConverter(errorParser, serviceName, ip, port);
        }
    
        @Override public boolean handle(TraceContext context, MutableSpan span) {
            if (!Boolean.TRUE.equals(context.sampled())) return true;
    
            Span.Builder builderWithContextData = Span.newBuilder()
                .traceId(context.traceIdString())
                .parentId(context.parentIdString())
                .id(context.spanIdString());
            if (context.debug()) builderWithContextData.debug(true);
    
            converter.convert(span, builderWithContextData);
            spanReporter.report(builderWithContextData.build());
            return true;
        }
    
        @Override public String toString() {
            return spanReporter.toString();
        }
    }
    

    可见,上面最终是通过Reporter组件来上报数据的。那么Report是如何上报的了?

    Brave Reporter

    因为上报组件要支持多种编码协议以及多种传输协议,因此逻辑比较复杂,官方专门建了一个项目:zipkin-reporter-java

    我们首先看看Reporter接口类定义:

    public interface Reporter<S> {
        Reporter<Span> NOOP = new Reporter<Span>() {
            @Override public void report(Span span) {
            }
    
            @Override public String toString() {
            return "NoopReporter{}";
            }
        };
        Reporter<Span> CONSOLE = new Reporter<Span>() {
            @Override public void report(Span span) {
                 System.out.println(span.toString());
            }
    
            @Override public String toString() {
                return "ConsoleReporter{}";
            }
        };
    
        /**
        * Schedules the span to be sent onto the transport.
        *
        * @param span Span, should not be <code>null</code>.
        */
        void report(S span);
    }
    

    Reporter有三个子类实现,分别是CONSOLE,NOOP,和AsyncReporter。CONSOLE就是直接控制台打印出Span数据,一般用来调试差不多;NOOP是一个空实现,啥也不干;AsyncReporter是我们平时上报用的Reporter,他提供异步上报数据能力。

    AsyncReporter

    AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second before flushes any pending spans out of process via a Sender.

    根据不同协议,Ayncreporter执行相应的build逻辑:

    public AsyncReporter<Span> build() {
        switch(this.sender.encoding()) {
            case JSON:
                return this.build(SpanBytesEncoder.JSON_V2);
            case PROTO3:
                return this.build(SpanBytesEncoder.PROTO3);
            case THRIFT:
                return this.build(SpanBytesEncoder.THRIFT);
            default:
                throw new UnsupportedOperationException(this.sender.encoding().name());
        }
    }
    

    build方法详细逻辑,如下:

    public <S> AsyncReporter<S> build(BytesEncoder<S> encoder) {
        if (encoder == null) throw new NullPointerException("encoder == null");
    
        if (encoder.encoding() != sender.encoding()) {
            throw new IllegalArgumentException(String.format(
        "Encoder doesn't match Sender: %s %s", encoder.encoding(), sender.encoding()));
        }
    
        final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);
    
        if (messageTimeoutNanos > 0) { 
            // Start a thread that flushes the queue in a loop.
            final BufferNextMessage<S> consumer =
            BufferNextMessage.create(encoder.encoding(), messageMaxBytes, messageTimeoutNanos);
            Thread flushThread = threadFactory.newThread(new Flusher<>(result, consumer));
            flushThread.setName("AsyncReporter{" + sender + "}");
            flushThread.setDaemon(true);
            flushThread.start();
        }
        return result;
    }
    
    

    可以看到AsyncReporter的build方法中,启动了一个守护线程flushThread,一直循环调用BoundedAsyncReporter的flush方法:

    void flush(BufferNextMessage<S> bundler) {
        if (closed.get()) throw new IllegalStateException("closed");
        pending.drainTo(bundler, bundler.remainingNanos());
    
        // record after flushing reduces the amount of gauge events vs on doing this on report
        metrics.updateQueuedSpans(pending.count);
        metrics.updateQueuedBytes(pending.sizeInBytes);
    
        // loop around if we are running, and the bundle isn't full
        // if we are closed, try to send what's pending
        if (!bundler.isReady() && !closed.get()) return;
    
        // Signal that we are about to send a message of a known size in bytes
        metrics.incrementMessages();
        metrics.incrementMessageBytes(bundler.sizeInBytes());
        ArrayList<byte[]> nextMessage = new ArrayList<>(bundler.count());
        bundler.drain(new SpanWithSizeConsumer<S>() {
            @Override public boolean offer(S next, int nextSizeInBytes) {
                nextMessage.add(encoder.encode(next)); // speculatively add to the pending message
                if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) {
                    // if we overran the message size, remove the encoded message.
                    nextMessage.remove(nextMessage.size() - 1);
                    return false;
                }
                return true;
            }
        });
    
        try {
            sender.sendSpans(nextMessage).execute();
        } catch (IOException | RuntimeException | Error t) {
            ......
        }
    }
    

    Flush方法的主要逻辑如下:

    • 将队列pending中的数据,提取到nextMessage链表中;
    • 调用Sender的sendSpans方法,发送到nextMessage链表中的Span数据到Zipkin;

    这样,Reporter即做到了异步发送!

    Sender

    Sender组件完成发送Span到zipkin服务端的最后一步,即利用某个传输协议,将数据发送到zipkin服务端。

    public abstract class Sender extends Component {
        public Sender() {
        }
        
        public abstract Encoding encoding();
        
        public abstract int messageMaxBytes();
        
        public abstract int messageSizeInBytes(List<byte[]> var1);
        
        public int messageSizeInBytes(int encodedSizeInBytes) {
            return this.messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes]));
        }
        
        public abstract Call<Void> sendSpans(List<byte[]> var1);
    }
    

    其中核心的方法sendSpans是一个抽象方法,不同传输协议的Sender会提供具体的实现逻辑,其子类有:
    ActiveMQSender、FakeSender、KafkaSender、LibthriftSender、OkHttpSender、RabbitMQSender、URLConnectionSender。

    不同协议均按照自身协议规范执行发送逻辑,因为我们的Demo中用的是OkHttpSender,所以我们主要看看OkHttpSender是如何实现的。Demo中使用如下:

    Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
    AsyncReporter asyncReporter = AsyncReporter.create(sender);
    

    这里,通过AsyncReporter.create方法,我们将OkHttpSender注入到了Reporter中,那么接下来看看OkHttpSender的sendSpans方法实现:

    @Override 
    public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
        if (closeCalled) throw new IllegalStateException("closed");
        Request request;
        try {
            request = newRequest(encoder.encode(encodedSpans));
        } catch (IOException e) {
            throw zipkin2.internal.Platform.get().uncheckedIOException(e);
        }
        return new HttpCall(client.newCall(request));
    }
    

    执行完这个方法后,会返回一个HttpCall,Reporter的flush方法中会调用HttpCall的execute方法,完成Http请求发送。

    Brave Tracer

    Span数据结构,包括发送Span的组件我们搞清楚了,那么谁来负责创建Span了?这就是Tracer的工作,他负责创建Span及提供Span的各种操作,主要方法如下表所示:

    方法名 描述
    Span newTrace() 创建一个Root Span
    Span joinSpan(TraceContext context) 公用一个SpanId,主要存在于RPC场景中
    Span newChild(TraceContext parent) 创建一个子Span
    Span nextSpan(TraceContextOrSamplingFlags extracted) 基于请求的参数信息创建一个新的Span
    Span toSpan(TraceContext context) 通过TraceContext创建一个Span
    Span currentSpan() 获取当前Span
    Span nextSpan() 基于当前Span生成一个子Span

    Brave Tracing

    现在Span有了,创建Span的组件有了,发送Span的组件也有了,那就只需要一个把他们组合起来的类似工厂的角色了,那就是Tracing,他的主要工作就是连接服务器,然后利用Tracer创建出Span,接着发送Span到zipkin服务端。

    Tracing源码采用的Builder模式,再看看我们Demo中创建Tracing的代码:

    Tracing tracing = Tracing.newBuilder()
                .localServiceName("my-service")
                .spanReporter(asyncReporter)
                .build();
    

    我们Tracing.newBuilder()创建了一个Tracing的Builder,然后指定了这个Tracing的服务名,使用什么Reporter,接着调用了Builder的build方法,我们看看build方法代码:

    public Tracing build() {
        if (clock == null) clock = Platform.get().clock();
        if (localIp == null) localIp = Platform.get().linkLocalIp();
        if (spanReporter == null) spanReporter = new LoggingReporter();
        return new Default(this);
    }
    

    它调用了Tracing的默认实现,默认实现子类如下:

    static final class Default extends Tracing {
        final Tracer tracer;
        final Propagation.Factory propagationFactory;
        final Propagation<String> stringPropagation;
        final CurrentTraceContext currentTraceContext;
        final Sampler sampler;
        final Clock clock;
        final ErrorParser errorParser;
    
        Default(Builder builder) {
          this.clock = builder.clock;
          this.errorParser = builder.errorParser;
          this.propagationFactory = builder.propagationFactory;
          this.stringPropagation = builder.propagationFactory.create(Propagation.KeyFactory.STRING);
          this.currentTraceContext = builder.currentTraceContext;
          this.sampler = builder.sampler;
          zipkin2.Endpoint localEndpoint = zipkin2.Endpoint.newBuilder()
              .serviceName(builder.localServiceName)
              .ip(builder.localIp)
              .port(builder.localPort)
              .build();
          SpanReporter reporter = new SpanReporter(localEndpoint, builder.reporter, noop);
          this.tracer = new Tracer(
              builder.clock,
              builder.propagationFactory,
              reporter,
              new PendingSpans(localEndpoint, clock, reporter, noop),
              builder.sampler,
              builder.errorParser,
              builder.currentTraceContext,
              builder.traceId128Bit || propagationFactory.requires128BitTraceId(),
              builder.supportsJoin && propagationFactory.supportsJoin(),
              noop
          );
          maybeSetCurrent();
        }
    

    从上面可以看到,主要干的工作有:

    • 根据Spanreporter,生成FinishedSpanHandler,发送Span用;
    • 根据FinishedSpanHandler以及其他默认信息生成Tracer;

    OK,现在对于DEMO中的Brave的上报数据流程和原理是不是清楚了不少!

    链路上报总结

    Zipkin链路上报看起来很复杂,其实剥离各种封装,去除各种组件,其主线逻辑就是如下三步:

    • 构造span对象,包括traceId,parentId,以及其自身的spanId等参数;
    • 选一种编解码协议,比如JSON,或者THRIF,或者PROTO3对Span进行编码;
    • 将编码后的span,通过利用一种传输协议上报到服务端;

    还有一块未分析:Brave是如何Support各个组件的。因为本文内容较多,放到以后的文章分析!

    OK,ZIPKIN 链路上报分析到此为止,祝大家工作顺利,天天开心!

    相关文章

      网友评论

        本文标题:深入探究ZIPKIN调用链跟踪——链路上报篇

        本文链接:https://www.haomeiwen.com/subject/hwuusctx.html