ZIPKIN作为当下流行的分布式调用链解决方案,它底层存储支持多种组件,包括elasticsearch,cassandra,mysql等,那么它是如果做到灵活的支持多种存储,同时,又是如何做到高效的存储和检索服务调用链路信息的了?值得我们深入分析探究!
zipkin的简化架构如下图1所示,各个服务通过ZIPKIN 客户端(CLIENT)SDK 将调用链信息上报到zipkin服务端,服务端Collector接收到链路数据后,将链路存储到数据库中。
图1 zipkin架构
在这个数据上报和存储过程中,涉及到多个zipkin的类和组件,主要有Collecotr,StorageComponent,SpanStore,SpanConsumer以及其子类等,存储检索核心类图如图2所示。
图2 zipkin存储检索类图
一、存储组件
Zipkin数据读写这块主要分为三部分,一是底层存储组件部分,二是数据检索部分,三是数据存储部分,存和写都依赖于底层存储,接下来将分别分析一下这三个部分。
1.1、存储组件初始化
StoregeComponent是zipkin存储和检索的核心组建,它提供了zipkin中spans和aggregations的存储和查询接口
StoregeComponent定义
StoregeComponent是一个抽象类,定义如下:
public abstract class StorageComponent extends Component {
public abstract SpanStore spanStore();
public abstract SpanConsumer spanConsumer();
public static abstract class Builder {
public abstract Builder strictTraceId(boolean strictTraceId);
public abstract Builder searchEnabled(boolean searchEnabled);
public abstract StorageComponent build();
}
}
StorageComponent提供了两个接口,分别获得SpanStore和SpanConsumer。而具体是什么SpanStore和SpanConsumer,依据底层存储而定。
StoregeComponent初始化过程
zipkin支持四种存储方法,分别为elasticsearch,cassandra,mysql,inMemory。因此StorageComonent可以初始化为这四种存储,我们看看采用elasticsearch的初始化过程。
图3 ES存储初始化.png
Elasticsearch Storage初始化主类是ZipkinElasticsearchStorageAutoConfiguration,其定义如下:
@Configuration
@EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
@ConditionalOnMissingBean(StorageComponent.class)
// intentionally public for import by zipkin-autoconfigure-storage-elasticsearch-aws
public class ZipkinElasticsearchStorageAutoConfiguration {
@Bean
@ConditionalOnMissingBean
StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
return esHttpBuilder.build();
}
@Bean
ElasticsearchStorage.Builder esHttpBuilder(
ZipkinElasticsearchStorageProperties elasticsearch,
@Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
@Value("${zipkin.query.lookback:86400000}") int namesLookback,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
return elasticsearch
.toBuilder(client)
.namesLookback(namesLookback)
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled);
}
...
}
类前面加了四个注解,分别看看注解的作用:
@Configuration
这是Spring Boot的配置注解,类即可作为一个配置类。
@EnableConfigurationProperties(ZipkinElasticsearchStorageProperties.class)
使使用@ConfigurationProperties注解的类生效,在此就是让ZipkinElasticsearchStorageProperties配置文件类生效,ZipkinElasticsearchStorageProperties是zipkin的es配置文件类,可以得到配置文件中zipkin es的各个属性,同时这个类中提供了初始化Builder ElasticsearchStorage的方法,如下:
public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
ElasticsearchStorage.Builder builder = ElasticsearchStorage.newBuilder(client);
if (hosts != null) builder.hosts(hosts);
return builder
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
.maxRequests(maxRequests)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "elasticsearch")
这个注解的作用时:只有zipkin配置项zipkin.storage.type值为elasticsearch时,本类才生效。
ConditionalOnMissingBean(StorageComponent.class)
这个注解的作用是:只有当StorageComponent还没有初始化,本类才生效。
开始初始化StorageComponent,生成Bean的方法如下:
@Bean
@ConditionalOnMissingBean
StorageComponent storage(ElasticsearchStorage.Builder esHttpBuilder) {
return esHttpBuilder.build();
}
@Bean
ElasticsearchStorage.Builder esHttpBuilder(
ZipkinElasticsearchStorageProperties elasticsearch,
@Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
@Value("${zipkin.query.lookback:86400000}") int namesLookback,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled) {
return elasticsearch
.toBuilder(client)
.namesLookback(namesLookback)
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled);
}
首先会先生成一个ElasticsearchStorage.Builder Bean,参数包括OkHttpClient,
namesLookback,strictTraceId,searchEnabled. 方法中调用了ZipkinElasticsearchStorageProperties的toBuilder方法生成ElasticsearchStorage.Builder。
然后调用ElasticsearchStorage.Builder的build()方法生成StorageComponent。build方法实现在$AutoValue_ElasticsearchStorage,这样即可构造出一个ElasticsearchStorage 即StoregeComponent.
二、存储分析
zipkin服务端通过collector接受客户端上报的span,然后存到存储介质中,接收方法定义如下:
public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
metrics.incrementSpans(spans.size());
List<Span> sampled = sample(spans);
if (sampled.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
record(sampled, acceptSpansCallback(sampled));
callback.onSuccess(null);
} catch (RuntimeException e) {
callback.onError(errorStoringSpans(sampled, e));
return;
}
}
其中record为存sapn的方法,record方法如下:
void record(List<Span> sampled, Callback<Void> callback) {
storage.spanConsumer().accept(sampled).enqueue(callback);
}
方法中,首先通过StoregeComponent获取到SpanConsumer,然后调用其accept方法,接下来看看SpanConsumer。
SpanConsumer
存储Span的接口类为SpanConsumer,接口类定义如下:
public interface SpanConsumer {
Call<Void> accept(List<Span> spans);
}
接口中只有一个方法accept,接收到spans后,进行持久化存储。接口类根据不同的存储介质有不同的实现,SpanConsumer的子类有:
SpanConsumer
因为zipkin支持四种存储方法(elasticsearch,cassandra,mysql,inMemory),因此SpanConsumer的实现也有四种,我们主要分析一下elasticsearch的SpanConsumer实现。
public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.create(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(this);
indexSpans(indexer, spans);
return indexer.newCall();
}
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
for (Span span : spans) {
long spanTimestamp = span.timestampAsLong();
long indexTimestamp = 0L; // which index to store this span into
if (spanTimestamp != 0L) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
} else {
for (int i = 0, length = span.annotations().size(); i < length; i++) {
indexTimestamp = span.annotations().get(i).timestamp() / 1000;
break;
}
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
indexer.add(indexTimestamp, span, spanTimestamp);
if (searchEnabled && !span.tags().isEmpty()) {
indexer.addAutocompleteValues(indexTimestamp, span);
}
}
}
elasticsearch存储span的主要步骤如下:
1)获取索引名。根据当前span的时间戳决定应该存储在什么索引中。zipkin是按天建索引的,今天是2019-08-04,那么2019-08-04这天上报的span,将存储在zipkin:span-2019-08-04这个索引中;
2)调用HttpBulkIndexer的newCall方法执行http请求存储span。
这样,zipkin span即存到elasticsearch中了!
三、检索分析
3.1、ZIPKIN的查询API接口
ZIPKIN数据检索模式为:后端提供API接口,前端调用。定义API的接口类为:ZipkinQueryApiV2,其中定义了如下接口:
1、查询拓扑dependency接口
@RequestMapping(
value = "/dependencies",
method = RequestMethod.GET,
produces = APPLICATION_JSON_VALUE)
public byte[] getDependencies(
@RequestParam(value = "endTs", required = true) long endTs,
@Nullable @RequestParam(value = "lookback", required = false) Long lookback)
throws IOException {
Call<List<DependencyLink>> call =
storage.spanStore().getDependencies(endTs, lookback != null ? lookback : defaultLookback);
return DependencyLinkBytesEncoder.JSON_V1.encodeList(call.execute());
}
2、查询所有服务接口
@RequestMapping(value = "/services", method = RequestMethod.GET)
public ResponseEntity<List<String>> getServiceNames() throws IOException {
List<String> serviceNames = storage.spanStore().getServiceNames().execute();
serviceCount = serviceNames.size();
return maybeCacheNames(serviceNames);
}
3、查询Spans接口
@RequestMapping(value = "/spans", method = RequestMethod.GET)
public ResponseEntity<List<String>> getSpanNames(
@RequestParam(value = "serviceName", required = true) String serviceName) throws IOException {
return maybeCacheNames(storage.spanStore().getSpanNames(serviceName).execute());
}
4、多条件查询Trace接口
@RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public String getTraces(
@Nullable @RequestParam(value = "serviceName", required = false) String serviceName,
@Nullable @RequestParam(value = "spanName", required = false) String spanName,
@Nullable @RequestParam(value = "annotationQuery", required = false) String annotationQuery,
@Nullable @RequestParam(value = "minDuration", required = false) Long minDuration,
@Nullable @RequestParam(value = "maxDuration", required = false) Long maxDuration,
@Nullable @RequestParam(value = "endTs", required = false) Long endTs,
@Nullable @RequestParam(value = "lookback", required = false) Long lookback,
@RequestParam(value = "suspectSpan", defaultValue = "0") int suspectSpan,
@RequestParam(value = "limit", defaultValue = "10") int limit)
throws IOException {
QueryRequest queryRequest =
QueryRequest.newBuilder()
.serviceName(serviceName)
.spanName(spanName)
.parseAnnotationQuery(annotationQuery)
.minDuration(minDuration)
.maxDuration(maxDuration)
.endTs(endTs != null ? endTs : System.currentTimeMillis())
.lookback(lookback != null ? lookback : defaultLookback)
.limit(limit).suspectSpan(suspectSpan)
.build();
List<List<Span>> traces = storage.spanStore().getTraces(queryRequest).execute();
return new String(writeTraces(SpanBytesEncoder.JSON_V2, traces), UTF_8);
}
5、根据TraceId查询Trace接口
@RequestMapping(
value = "/trace/{traceIdHex}",
method = RequestMethod.GET,
produces = APPLICATION_JSON_VALUE)
public String getTrace(@PathVariable String traceIdHex, WebRequest request) throws IOException {
List<Span> trace = storage.spanStore().getTrace(traceIdHex).execute();
if (trace.isEmpty()) throw new TraceNotFoundException(traceIdHex);
return new String(SpanBytesEncoder.JSON_V2.encodeList(trace), UTF_8);
}
3.2、检索流程分析
检索Span的接口类为SpanStore,接口类定义如下:
public interface SpanStore {
Call<List<List<Span>>> getTraces(QueryRequest request);
Call<List<Span>> getTrace(String traceId);
Call<List<String>> getSpanNames(String serviceName);
Call<List<DependencyLink>> getDependencies(long endTs, long lookback);
}
SpanStore中提供了四个接口:
1)根据查询条件(QueryRequest)查询Traces,接口为:getTraces(QueryRequest request);
2)根据TraceId,查询Trace,接口为:getTrace(String traceId);
3)根据服务名获取Span names,接口为:getSpanNames(String serviceName);
4)根据起止时间获取此时间范围内拓扑索引,接口为:getDependencies(long endTs, long lookback);
同样,SpanStore根据不同的存储介质也有不同的实现,SpanStore的实现类有:
SpanStore
我们仍然主要关注ES存储的实现,ES接口检索逻辑主要分为一下几步:
1)构造请求Filters,如果是单个请求添加,那么Filters中就只有一项内容,如果是多个查询条件,那么就是多项内容;
2)构造Aggregation
3)获取Target indexs,即从什么索引中检索数据;
4)根据Filters,Aggragation,indexs构造请求SearchRequest.
- 根据SearchRequest指定检索请求,检索数据。
下面就几个特定接口进行详细分析:
1、根据多个查询条件检索链路
1)创建包含多个查询条件的Filters
//添加“时间段”过滤条件
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
//添加“服务名”过滤条件
if (request.serviceName() != null) {
filters.addTerm("localEndpoint.serviceName", request.serviceName());
}
//添加“spanName“过滤条件
if (request.spanName() != null) {
filters.addTerm("name", request.spanName());
}
//添加annotions过滤条件(可能多个annotation)
for (Map.Entry<String, String> kv : request.annotationQuery().entrySet()) {
if (kv.getValue().isEmpty()) {
filters.addTerm("_q", kv.getKey());
} else {
filters.addTerm("_q", kv.getKey() + "=" + kv.getValue());
}
}
//添加“耗时”过滤条件
if (request.minDuration() != null) {
filters.addRange("duration", request.minDuration(), request.maxDuration());
}
- 创建Aggregation
//添加“traceId“过滤条件
Aggregation traceIdTimestamp =Aggregation.terms("traceId", request.limit()).addSubAggregation(Aggregation.min("timestamp_millis"))
.orderBy("timestamp_millis", "desc");
3)获得Target索引
List<String> indices =indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis));
if (indices.isEmpty())
return Call.emptyList();
4)构造filter,aggragation,以及indexs构造SeachRequest;
//构造请求SearchRequest
SearchRequest esRequest = SearchRequest.create(indices).filters(filters).addAggregation(traceIdTimestamp);
5)执行请求检索Traces
HttpCall<List<String>> traceIdsCall = search.newCall(esRequest, BodyConverters.KEYS);
Call<List<List<Span>>> result =
traceIdsCall.flatMap(new GetSpansByTraceId(search, indices)).map(groupByTraceId);
return strictTraceId ? result.map(StrictTraceId.filterTraces(request)) : result;
2、检索serviceNames
public Call<List<String>> getServiceNames() {
if (!searchEnabled) return Call.emptyList();
long endMillis = System.currentTimeMillis();
long beginMillis = endMillis - namesLookback;
List<String> indices = indexNameFormatter.formatTypeAndRange(SPAN, beginMillis, endMillis);
if (indices.isEmpty()) return Call.emptyList();
// Service name queries include both local and remote endpoints. This is different than
// Span name, as a span name can only be on a local endpoint.
SearchRequest.Filters filters = new SearchRequest.Filters();
filters.addRange("timestamp_millis", beginMillis, endMillis);
SearchRequest request =
SearchRequest.create(indices)
.filters(filters)
.addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
.addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
return search.newCall(request, BodyConverters.KEYS);
}
可以发现,逻辑跟“根据多个查询条件检索链路”方法基本一样。
祝工作顺利,天天开心!
网友评论