美文网首页Java 杂谈Java开发那些事Java
ZIPKIN调用链跟踪深入探究——存储检索篇

ZIPKIN调用链跟踪深入探究——存储检索篇

作者: PioneerYi | 来源:发表于2019-08-14 20:41 被阅读4次

    ZIPKIN作为当下流行的分布式调用链解决方案,它底层存储支持多种组件,包括elasticsearch,cassandra,mysql等,那么它是如果做到灵活的支持多种存储,同时,又是如何做到高效的存储和检索服务调用链路信息的了?值得我们深入分析探究!

    zipkin的简化架构如下图1所示,各个服务通过ZIPKIN 客户端(CLIENT)SDK 将调用链信息上报到zipkin服务端,服务端Collector接收到链路数据后,将链路存储到数据库中。


    图1 zipkin架构

    在这个数据上报和存储过程中,涉及到多个zipkin的类和组件,主要有Collecotr,StorageComponent,SpanStore,SpanConsumer以及其子类等,存储检索核心类图如图2所示。


    图2 zipkin存储检索类图

    一、存储组件

    Zipkin数据读写这块主要分为三部分,一是底层存储组件部分,二是数据检索部分,三是数据存储部分,存和写都依赖于底层存储,接下来将分别分析一下这三个部分。

    1.1、存储组件初始化

    StoregeComponent是zipkin存储和检索的核心组建,它提供了zipkin中spans和aggregations的存储和查询接口

    StoregeComponent定义

    StoregeComponent是一个抽象类,定义如下:

    public abstract class StorageComponent extends Component {
        public abstract SpanStore spanStore();
        public abstract SpanConsumer spanConsumer();
        public static abstract class Builder {
            public abstract Builder strictTraceId(boolean strictTraceId);
            public abstract Builder searchEnabled(boolean searchEnabled);
            public abstract StorageComponent build();
        }
    }
    

    StorageComponent提供了两个接口,分别获得SpanStore和SpanConsumer。而具体是什么SpanStore和SpanConsumer,依据底层存储而定。

    StoregeComponent初始化过程

    zipkin支持四种存储方法,分别为elasticsearch,cassandra,mysql,inMemory。因此StorageComonent可以初始化为这四种存储,我们看看采用elasticsearch的初始化过程。


    图3 ES存储初始化.png

    Elasticsearch Storage初始化主类是ZipkinElasticsearchStorageAutoConfiguration,其定义如下:

    @Configuration
    @EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
    @ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
    @ConditionalOnMissingBean(StorageComponent.class)
    // intentionally public for import by zipkin-autoconfigure-storage-elasticsearch-aws
    public class ZipkinElasticsearchStorageAutoConfiguration {
    
      @Bean
      @ConditionalOnMissingBean
      StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
        return esHttpBuilder.build();
      }
    
      @Bean
      ElasticsearchStorage.Builder esHttpBuilder(
          ZipkinElasticsearchStorageProperties elasticsearch,
          @Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
          @Value("${zipkin.query.lookback:86400000}") int namesLookback,
          @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
          @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
        return elasticsearch
            .toBuilder(client)
            .namesLookback(namesLookback)
            .strictTraceId(strictTraceId)
            .searchEnabled(searchEnabled);
      }
      ...
      
    }
    
    

    类前面加了四个注解,分别看看注解的作用:

    @Configuration
    这是Spring Boot的配置注解,类即可作为一个配置类。

    @EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
    使使用@ConfigurationProperties注解的类生效,在此就是让ZipkinElasticsearchStorageProperties配置文件类生效,ZipkinElasticsearchStorageProperties是zipkin的es配置文件类,可以得到配置文件中zipkin es的各个属性,同时这个类中提供了初始化Builder ElasticsearchStorage的方法,如下:

    public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
        ElasticsearchStorage.Builder builder = ElasticsearchStorage.newBuilder(client);
        if (hosts != null) builder.hosts(hosts);
        return builder
            .index(index)
            .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
            .pipeline(pipeline)
            .maxRequests(maxRequests)
            .indexShards(indexShards)
            .indexReplicas(indexReplicas);
    }
    

    ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
    这个注解的作用时:只有zipkin配置项zipkin.storage.type值为elasticsearch时,本类才生效。

    ConditionalOnMissingBean(StorageComponent.class)
    这个注解的作用是:只有当StorageComponent还没有初始化,本类才生效。

    开始初始化StorageComponent,生成Bean的方法如下:

    @Bean
    @ConditionalOnMissingBean
    StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
        return esHttpBuilder.build();
    }
    
    @Bean
    ElasticsearchStorage.Builder esHttpBuilder(
        ZipkinElasticsearchStorageProperties elasticsearch,
        @Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
        @Value("${zipkin.query.lookback:86400000}") int namesLookback,
        @Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
        @Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
      return elasticsearch
          .toBuilder(client)
          .namesLookback(namesLookback)
          .strictTraceId(strictTraceId)
          .searchEnabled(searchEnabled);
    }
    

    首先会先生成一个ElasticsearchStorage.Builder Bean,参数包括OkHttpClient,
    namesLookback,strictTraceId,searchEnabled. 方法中调用了ZipkinElasticsearchStorageProperties的toBuilder方法生成ElasticsearchStorage.Builder。

    然后调用ElasticsearchStorage.Builder的build()方法生成StorageComponent。build方法实现在$AutoValue_ElasticsearchStorage,这样即可构造出一个ElasticsearchStorage 即StoregeComponent.

    二、存储分析

    zipkin服务端通过collector接受客户端上报的span,然后存到存储介质中,接收方法定义如下:

    public void accept(List<Span> spans, Callback<Void> callback) {
        if (spans.isEmpty()) {
            callback.onSuccess(null);
            return;
        }
        metrics.incrementSpans(spans.size());
        List<Span> sampled = sample(spans);
        if (sampled.isEmpty()) {
            callback.onSuccess(null);
            return;
        }
        try {
            record(sampled, acceptSpansCallback(sampled));
            callback.onSuccess(null);
        } catch (RuntimeException e) {
            callback.onError(errorStoringSpans(sampled, e));
            return;
        }
    }
    

    其中record为存sapn的方法,record方法如下:

    void record(List<Span> sampled, Callback<Void> callback) {
        storage.spanConsumer().accept(sampled).enqueue(callback);
    }
    

    方法中,首先通过StoregeComponent获取到SpanConsumer,然后调用其accept方法,接下来看看SpanConsumer。

    SpanConsumer

    存储Span的接口类为SpanConsumer,接口类定义如下:

    public interface SpanConsumer {
      Call<Void> accept(List<Span> spans);
    }
    

    接口中只有一个方法accept,接收到spans后,进行持久化存储。接口类根据不同的存储介质有不同的实现,SpanConsumer的子类有:


    SpanConsumer

    因为zipkin支持四种存储方法(elasticsearch,cassandra,mysql,inMemory),因此SpanConsumer的实现也有四种,我们主要分析一下elasticsearch的SpanConsumer实现。

    public Call<Void> accept(List<Span> spans) {
        if (spans.isEmpty()) return Call.create(null);
        BulkSpanIndexer indexer = new BulkSpanIndexer(this);
        indexSpans(indexer, spans);
        return indexer.newCall();
    }
    
    void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
        for (Span span : spans) {
            long spanTimestamp = span.timestampAsLong();
            long indexTimestamp = 0L; // which index to store this span into
            if (spanTimestamp != 0L) {
                indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
            } else {
                for (int i = 0, length = span.annotations().size(); i < length; i++) {
                    indexTimestamp = span.annotations().get(i).timestamp() / 1000;
                    break;
                }
                if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
            }
            indexer.add(indexTimestamp, span, spanTimestamp);
            if (searchEnabled && !span.tags().isEmpty()) {
                indexer.addAutocompleteValues(indexTimestamp, span);
            }
        }
    }
    

    elasticsearch存储span的主要步骤如下:

    1)获取索引名。根据当前span的时间戳决定应该存储在什么索引中。zipkin是按天建索引的,今天是2019-08-04,那么2019-08-04这天上报的span,将存储在zipkin:span-2019-08-04这个索引中;

    2)调用HttpBulkIndexer的newCall方法执行http请求存储span。

    这样,zipkin span即存到elasticsearch中了!

    三、检索分析

    3.1、ZIPKIN的查询API接口

    ZIPKIN数据检索模式为:后端提供API接口,前端调用。定义API的接口类为:ZipkinQueryApiV2,其中定义了如下接口:

    1、查询拓扑dependency接口

      @RequestMapping(
          value = "/dependencies",
          method = RequestMethod.GET,
          produces = APPLICATION_JSON_VALUE)
      public byte[] getDependencies(
          @RequestParam(value = "endTs", required = true) long endTs,
          @Nullable @RequestParam(value = "lookback", required = false) Long lookback)
          throws IOException {
        Call<List<DependencyLink>> call =
            storage.spanStore().getDependencies(endTs, lookback != null ? lookback : defaultLookback);
        return DependencyLinkBytesEncoder.JSON_V1.encodeList(call.execute());
      }
    
    

    2、查询所有服务接口

    @RequestMapping(value = "/services", method = RequestMethod.GET)
    public ResponseEntity<List<String>> getServiceNames() throws IOException {
      List<String> serviceNames = storage.spanStore().getServiceNames().execute();
      serviceCount = serviceNames.size();
      return maybeCacheNames(serviceNames);
    }
    

    3、查询Spans接口

    @RequestMapping(value = "/spans", method = RequestMethod.GET)
    public ResponseEntity<List<String>> getSpanNames(
        @RequestParam(value = "serviceName", required = true) String serviceName) throws IOException {
      return maybeCacheNames(storage.spanStore().getSpanNames(serviceName).execute());
    }
    

    4、多条件查询Trace接口

    @RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
    public String getTraces(
        @Nullable @RequestParam(value = "serviceName", required = false) String serviceName,
        @Nullable @RequestParam(value = "spanName", required = false) String spanName,
        @Nullable @RequestParam(value = "annotationQuery", required = false) String annotationQuery,
        @Nullable @RequestParam(value = "minDuration", required = false) Long minDuration,
        @Nullable @RequestParam(value = "maxDuration", required = false) Long maxDuration,
        @Nullable @RequestParam(value = "endTs", required = false) Long endTs,
        @Nullable @RequestParam(value = "lookback", required = false) Long lookback,
        @RequestParam(value = "suspectSpan", defaultValue = "0") int suspectSpan,
        @RequestParam(value = "limit", defaultValue = "10") int limit)
        throws IOException {
      QueryRequest queryRequest =
          QueryRequest.newBuilder()
              .serviceName(serviceName)
              .spanName(spanName)
              .parseAnnotationQuery(annotationQuery)
              .minDuration(minDuration)
              .maxDuration(maxDuration)
              .endTs(endTs != null ? endTs : System.currentTimeMillis())
              .lookback(lookback != null ? lookback : defaultLookback)
              .limit(limit).suspectSpan(suspectSpan)
              .build();
    
      List<List<Span>> traces = storage.spanStore().getTraces(queryRequest).execute();
      return new String(writeTraces(SpanBytesEncoder.JSON_V2, traces), UTF_8);
    }
    

    5、根据TraceId查询Trace接口

    @RequestMapping(
        value = "/trace/{traceIdHex}",
        method = RequestMethod.GET,
        produces = APPLICATION_JSON_VALUE)
    public String getTrace(@PathVariable String traceIdHex, WebRequest request) throws IOException {
      List<Span> trace = storage.spanStore().getTrace(traceIdHex).execute();
      if (trace.isEmpty()) throw new TraceNotFoundException(traceIdHex);
      return new String(SpanBytesEncoder.JSON_V2.encodeList(trace), UTF_8);
    }
    

    3.2、检索流程分析

    检索Span的接口类为SpanStore,接口类定义如下:

    public interface SpanStore {
      Call<List<List<Span>>> getTraces(QueryRequest request);
      
      Call<List<Span>> getTrace(String traceId);
      
      Call<List<String>> getSpanNames(String serviceName);
      
      Call<List<DependencyLink>> getDependencies(long endTs, long lookback);
    }
    

    SpanStore中提供了四个接口:
    1)根据查询条件(QueryRequest)查询Traces,接口为:getTraces(QueryRequest request);

    2)根据TraceId,查询Trace,接口为:getTrace(String traceId);

    3)根据服务名获取Span names,接口为:getSpanNames(String serviceName);

    4)根据起止时间获取此时间范围内拓扑索引,接口为:getDependencies(long endTs, long lookback);

    同样,SpanStore根据不同的存储介质也有不同的实现,SpanStore的实现类有:


    SpanStore

    我们仍然主要关注ES存储的实现,ES接口检索逻辑主要分为一下几步:
    1)构造请求Filters,如果是单个请求添加,那么Filters中就只有一项内容,如果是多个查询条件,那么就是多项内容;
    2)构造Aggregation
    3)获取Target indexs,即从什么索引中检索数据;
    4)根据Filters,Aggragation,indexs构造请求SearchRequest.

    1. 根据SearchRequest指定检索请求,检索数据。

    下面就几个特定接口进行详细分析:

    1、根据多个查询条件检索链路
    1)创建包含多个查询条件的Filters

    //添加“时间段”过滤条件
    SearchRequest.Filters filters = new SearchRequest.Filters();
    filters.addRange("timestamp_millis", beginMillis, endMillis);
      
    //添加“服务名”过滤条件
    if (request.serviceName() != null) {
        filters.addTerm("localEndpoint.serviceName", request.serviceName());
    }
    
    //添加“spanName“过滤条件
    if (request.spanName() != null) {
        filters.addTerm("name", request.spanName());
    }
    
    //添加annotions过滤条件(可能多个annotation)
    for (Map.Entry<String, String> kv : request.annotationQuery().entrySet()) {
        if (kv.getValue().isEmpty()) {
            filters.addTerm("_q", kv.getKey());
        } else {
            filters.addTerm("_q", kv.getKey() + "=" + kv.getValue());
        }
    }
    
    //添加“耗时”过滤条件
    if (request.minDuration() != null) {
        filters.addRange("duration", request.minDuration(), request.maxDuration());
    }
    
    
    1. 创建Aggregation
    //添加“traceId“过滤条件
    Aggregation traceIdTimestamp =Aggregation.terms("traceId", request.limit()).addSubAggregation(Aggregation.min("timestamp_millis"))
    .orderBy("timestamp_millis", "desc");
    

    3)获得Target索引

    List<String> indices =indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis));
    if (indices.isEmpty()) 
        return Call.emptyList();
    

    4)构造filter,aggragation,以及indexs构造SeachRequest;

    //构造请求SearchRequest
    SearchRequest esRequest =  SearchRequest.create(indices).filters(filters).addAggregation(traceIdTimestamp);
    

    5)执行请求检索Traces

    HttpCall<List<String>> traceIdsCall = search.newCall(esRequest, BodyConverters.KEYS);
    
    Call<List<List<Span>>> result =
          traceIdsCall.flatMap(new GetSpansByTraceId(search, indices)).map(groupByTraceId);
          
    return strictTraceId ? result.map(StrictTraceId.filterTraces(request)) : result;
    

    2、检索serviceNames

    public Call<List<String>> getServiceNames() {
      if (!searchEnabled) return Call.emptyList();
    
      long endMillis = System.currentTimeMillis();
      long beginMillis = endMillis - namesLookback;
    
      List<String> indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
      if (indices.isEmpty()) return Call.emptyList();
    
      // Service name queries include both local and remote endpoints. This is different than
      // Span name, as a span name can only be on a local endpoint.
      SearchRequest.Filters filters = new SearchRequest.Filters();
      filters.addRange("timestamp_millis", beginMillis, endMillis);
      SearchRequest request =
          SearchRequest.create(indices)
              .filters(filters)
              .addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
              .addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
      return search.newCall(request, BodyConverters.KEYS);
    }
    

    可以发现,逻辑跟“根据多个查询条件检索链路”方法基本一样。

    祝工作顺利,天天开心!

    相关文章

      网友评论

        本文标题:ZIPKIN调用链跟踪深入探究——存储检索篇

        本文链接:https://www.haomeiwen.com/subject/poanjctx.html