前言
最近公司在考虑做全链路监控的事儿,主要是一个请求进来从服务网关到各个业务战队会流转到很多个战队的业务模块,如果一个业务中出现问题就会影响到整个调用链路的结果(响应时间、响应结果、异常处理等等)。因此我们需要考虑一个全链路监控机制来完成请求的全链路监控。最开始考虑直接基于pinpoint的注入插件来做,但是后来发现由于采样率等关系,无法应用到生产上,所以考虑自己做一套自己的标准。同时我们也想引入opentracing的标准,因此有了这一篇文章所覆盖的内容。
opentracing简单介绍
opentracing介绍opentracing的具体信息大家可以参照前言里的相关链接到opentracing的官网和github上的去看看,这里只是做一些简单地介绍。opentracing里主要包含以下几个组件:
Span
表示分布式调用链条中的一个调用单元,比方说某个dubbo的调用provider,或者是个http调用的服务提供方,他的边界包含一个请求进到服务内部再由某种途径(http/dubbo等)从当前服务出去。一个span一般会记录这个调用单元内部的一些信息,例如:
- 日志信息
- 标签信息
- 开始/结束时间
SpanContext
表示一个span对应的上下文,span和spanContext基本上是一一对应的关系,上下文存储的是一些需要跨越边界的一些信息,例如:
- spanId 当前这个span的id
- traceId 这个span所属的traceId(也就是这次调用链的唯一id)
- baggage 其他的能过跨越多个调用单元的信息
这个SpanContext可以通过某些媒介和方式传递给调用链的下游来做一些处理(例如子Span的id生成、信息的继承打印日志等等)
Tracer
tracer表示的是一个通用的接口,它相当于是opentracing标准的枢纽,它有以下的职责:
- 建立和开启一个span
- 从某种媒介中提取和注入一个spanContext
Carrier
表示的是一个承载spanContext的媒介,比方说在http调用场景中会有HttpCarrier,在dubbo调用场景中也会有对应的DubboCarrier。
Formatter
这个接口负责了具体场景中序列化反序列化上下文的具体逻辑,例如在HttpCarrier使用中通常就会有一个对应的HttpFormatter。Tracer的注入和提取就是委托给了Formatter
ScopeManager
这个类是0.30版本之后新加入的组件,这个组件的作用是能够通过它获取当前线程中启用的Span信息,并且可以启用一些处于未启用状态的span。在一些场景中,我们在一个线程中可能同时建立多个span,但是同一时间统一线程只会有一个span在启用,其他的span可能处在下列的状态中:
- 等待子span完成
- 等待某种阻塞方法
- 创建并未开始
除了上述组件之外,我们在实现一个分布式全链路监控框架的时候,还需要有一个reporter组件,通过它来打印或者上报一些关键链路信息(例如span创建和结束),只有把这些信息进行处理之后我们才能对全链路信息进行可视化和真正的监控。
简单实现思路
这篇文章先介绍一些关键组件(涵盖Span、SpanContext、Tracer和ScopeManager)关键逻辑的实现,也借鉴了一点sofa-tracer的实现思路(比方说spanId生成规则、traceId生成规则等,关于这些信息大家可以移步到sofa-tracer来查看)。我们的项目叫星图(StarAtlas),因此我们的组件都是以这个为前缀的,这里省去我们的包名作者日期等注释信息。
先来看Span:
import io.opentracing.Span;
import io.opentracing.SpanContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* StarAtlasSpan
* <p>
* the implementation of span
*
*/
public class StarAtlasSpan implements Span {
private StarAtlasTracer starAtlasTracer;
private long startTime;
private List<StarAtlasSpanReferenceRelationship> spanReferences;
private String operationName;
private StarAtlasSpanContext spanContext;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public StarAtlasSpan(StarAtlasTracer starAtlasTracer, long startTime,
List<StarAtlasSpanReferenceRelationship> spanReferences,
String operationName, StarAtlasSpanContext spanContext,
Map<String, ?> tags) {
AssertUtils.notNull(starAtlasTracer);
AssertUtils.notNull(spanContext);
this.starAtlasTracer = starAtlasTracer;
this.startTime = startTime;
this.spanReferences = spanReferences != null ? new ArrayList<StarAtlasSpanReferenceRelationship>(
spanReferences) : null;
this.operationName = operationName;
this.spanContext = spanContext;
//tags
this.setTags(tags);
// report extention to be implement
//SpanExtensionFactory.logStartedSpan(this);
}
@Override
public SpanContext context() {
return this.spanContext;
}
@Override
public Span setTag(String s, String s1) {
return null;
}
@Override
public Span setTag(String s, boolean b) {
return null;
}
@Override
public Span setTag(String s, Number number) {
return null;
}
@Override
public Span log(Map<String, ?> map) {
return null;
}
@Override
public Span log(long l, Map<String, ?> map) {
return null;
}
@Override
public Span log(String s) {
return null;
}
@Override
public Span log(long l, String s) {
return null;
}
@Override
public Span setBaggageItem(String s, String s1) {
return null;
}
@Override
public String getBaggageItem(String s) {
return null;
}
@Override
public Span setOperationName(String s) {
return null;
}
@Override
public void finish() {
}
@Override
public void finish(long l) {
}
private void setTags(Map<String, ?> tags) {
if (tags == null || tags.size() <= 0) {
return;
}
for (Map.Entry<String, ?> entry : tags.entrySet()) {
String key = entry.getKey();
if (StringUtils.isBlank(key)) {
continue;
}
Object value = entry.getValue();
if (value == null) {
continue;
}
if (value instanceof String) {
//初始化时候,tags也可以作为 client 和 server 的判断依据
this.setTag(key, (String) value);
} else if (value instanceof Boolean) {
this.setTag(key, (Boolean) value);
} else if (value instanceof Number) {
this.setTag(key, (Number) value);
} else {
logger.error("Span tags unsupported type [" + value.getClass() + "]");
}
}
}
}
这里比较简单,就是创建一个Span,并且注入一些信息,这里注释了一些打印日志的代码。在构建函数里面有个StarAtlasSpanReferenceRelationship的list,这个类实际上是标识了这个Span和其他Span之间的关系,用于创建Span的时候维护父子从属关系。
我们再来看看SpanContext:
import io.opentracing.SpanContext;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* StarAtlasSpanContext
*
* the span context implementation to store span information
*
*/
public class StarAtlasSpanContext implements SpanContext {
//spanId 分隔符
public static final String RPC_ID_SEPARATOR = ".";
//======================== 以下为序列化数据的 key ========================
private static final String TRACE_ID_KET = "tcid";
private static final String SPAN_ID_KET = "spid";
private static final String PARENT_SPAN_ID_KET = "pspid";
private static final String SAMPLE_KET = "sample";
private AtomicInteger childContextIndex = new AtomicInteger(0);
private String spanId;
private String traceId;
private String parentId;
/***
* 默认不会采样
*/
private boolean isSampled = false;
public StarAtlasSpanContext(String traceId, String spanId, String parentId) {
//默认不会采样
this(traceId, spanId, parentId, false);
}
public StarAtlasSpanContext(String traceId, String spanId, String parentId, boolean isSampled) {
this.traceId = traceId;
this.spanId = spanId;
this.parentId = StringUtils.isBlank(parentId) ? this.genParentSpanId(spanId) : parentId;
this.isSampled = isSampled;
}
@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return null;
}
/**
* 获取下一个子上下文的 ID
*
* @return 下一个 spanId
*/
public String nextChildContextId() {
return this.spanId + RPC_ID_SEPARATOR + childContextIndex.incrementAndGet();
}
public String getSpanId() {
return spanId;
}
public void setSpanId(String spanId) {
this.spanId = spanId;
}
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String getParentId() {
return parentId;
}
public void setParentId(String parentId) {
this.parentId = parentId;
}
public boolean isSampled() {
return isSampled;
}
public void setSampled(boolean sampled) {
isSampled = sampled;
}
private String genParentSpanId(String spanId) {
return (StringUtils.isBlank(spanId) || spanId.lastIndexOf(RPC_ID_SEPARATOR) < 0) ? StringUtils.EMPTY_STRING
: spanId.substring(0, spanId.lastIndexOf(RPC_ID_SEPARATOR));
}
}
这个类跟Span类似,也是存储了一些spanId、traceId和baggage等信息,另外有几个比较特别的函数,包括获取当前上下文的父级spanId,生成下一级的子span的id。
接下来再看看Scope和ScopeManager:
import io.opentracing.Scope;
import io.opentracing.ScopeManager;
import io.opentracing.Span;
/**
* StarAtlasScopeManager
* <p>
* the scope manager to store and manage the scope information within a thread
*
*/
public class StarAtlasScopeManager implements ScopeManager {
/**
* the thread local store for the active scope
*/
final ThreadLocal<StarAtlasScope> scopeThreadLocal = new ThreadLocal<>();
/**
* singleton method
*
* @return
*/
public static StarAtlasScopeManager getInstance() {
return StarAtlasScopeManagerSingletonHolder.INSTANCE;
}
private StarAtlasScopeManager() {
}
/**
* the method to active a span
*
* @param span
* @param finishOnClose
* @return
*/
@Override
public Scope activate(Span span, boolean finishOnClose) {
if (!checkCanActivate(span)) {
throw new IllegalStateException("a span cannot be activated more than once");
}
return new StarAtlasScope(this, span, finishOnClose);
}
/**
* the method to get the current active span
*
* @return
*/
@Override
public Scope active() {
return this.scopeThreadLocal.get();
}
/**
* check if the span can be activate
* if the span exists in the recover chain of the current active scope
* then we know that the span has been activate before.
*
* @param span
* @return
*/
private boolean checkCanActivate(Span span) {
StarAtlasScope scope = (StarAtlasScope) this.active();
while (scope != null) {
if (scope.span() == span) {
return false;
}
scope = scope.scopeToRecover;
}
return true;
}
private static class StarAtlasScopeManagerSingletonHolder {
private static final StarAtlasScopeManager INSTANCE = new StarAtlasScopeManager();
}
}
这里ScopeManage主要通过一个ThreadLocal来存储当前Span的信息(用一个Scope来包装)。然后实现了三个方法:
- activate 在当前线程中激活一个span,并返回一个scope封装当前激活的span
- active 返回当前线程激活的scope
- checkCanActivate 这是自行实现的一个方法,我们激活一个span封装scope的时候会把激活前线程中激活的scope以scopeToRecover变量存储在新激活的scope中(具体可参考接下来scope的代码)。这样我们就可以根据当前激活的scope以scopeToRecover来不断地追溯到最初,因此当我们激活一个span的时候,我们就可以通过这个span在不在追溯的链路上来判断是否这个span被重复激活了。
Scope代码如下:
import io.opentracing.Scope;
import io.opentracing.Span;
/**
* StarAtlasScope
* <p>
* StarAtlasScope is a wrap class for span
* It represents a active span in current thread.
* And it support close function to deactivate a span
*
*/
public class StarAtlasScope implements Scope {
/**
* finish the span or not when we close the scope
*/
private final boolean finishOnClose;
/**
* the wrapped span
*/
private final Span span;
/**
* scope manager
*/
private final StarAtlasScopeManager scopeManager;
/**
* the scope to recover on close
*/
final StarAtlasScope scopeToRecover;
StarAtlasScope(StarAtlasScopeManager scopeManager, Span span, boolean finishOnClose) {
this.finishOnClose = finishOnClose;
this.span = span;
this.scopeManager = scopeManager;
// store the previous scope to recover
this.scopeToRecover = this.scopeManager.scopeThreadLocal.get();
// push the current scope into thread local
// may extract into a package level method in StarAtlasScopeManager
this.scopeManager.scopeThreadLocal.set(this);
}
/**
* call close means the active period for the current thread and scope comes to an end
*/
@Override
public void close() {
// if the current active scope does not equal to this
// the close operation can not continue
if (scopeManager.active() != this) {
throw new IllegalStateException("can not call scope close in an unexpected way");
}
if (finishOnClose) {
span.finish();
}
// recover the scope
this.scopeManager.scopeThreadLocal.set(this.scopeToRecover);
}
@Override
public Span span() {
return span;
}
}
Scope的实现基本就是封装了一个span,并且在创建的时候把之前激活的scope存下来(印证了之前的说法),支持两个方法:
- close 关闭当前的scope,也连带的把封装的span关闭,并且恢复线程中激活的scope到之前。
- span 返回封装的span
最后我们再来看看Tracer:
import io.opentracing.*;
import io.opentracing.propagation.Format;
import java.util.*;
/**
*/
public class StarAtlasTracer implements Tracer {
/**
* traceID的KEY
*/
public static final String KEY_TRACEID = "SA-TRACEID";
/**
* 正常 TRACE 开始的 spanId
*/
public static final String ROOT_SPAN_ID = "0";
@Override
public ScopeManager scopeManager() {
return StarAtlasScopeManager.getInstance();
}
@Override
public Span activeSpan() {
return this.scopeManager().active().span();
}
@Override
public SpanBuilder buildSpan(String operationName) {
return new StarAtlasSpanBuilder(operationName);
}
@Override
public <C> void inject(SpanContext spanContext, Format<C> format, C c) {
}
@Override
public <C> SpanContext extract(Format<C> format, C c) {
return null;
}
/**
* the implementation of span builder
*/
private class StarAtlasSpanBuilder implements SpanBuilder {
private String operationName = StringUtils.EMPTY_STRING;
private long startTime = -1;
private List<StarAtlasSpanReferenceRelationship> references = Collections.emptyList();
private final Map<String, Object> tags = new HashMap<String, Object>();
private boolean ignoreActiveSpan = false;
public StarAtlasSpanBuilder(String operationName){
this.operationName = operationName;
}
@Override
public SpanBuilder asChildOf(SpanContext parentContext) {
return addReference(References.CHILD_OF, parentContext);
}
@Override
public SpanBuilder asChildOf(Span parentSpan) {
if(parentSpan == null){
return this;
}
return asChildOf(parentSpan.context());
}
@Override
public SpanBuilder addReference(String referenceType, SpanContext referencedContext) {
if (referencedContext == null) {
return this;
}
if (!(referencedContext instanceof StarAtlasSpanContext)) {
return this;
}
if (!References.CHILD_OF.equals(referenceType)
&& !References.FOLLOWS_FROM.equals(referenceType)) {
return this;
}
if (references.isEmpty()) {
// Optimization for 99% situations, when there is only one parent
references = Collections.singletonList(new StarAtlasSpanReferenceRelationship(
(StarAtlasSpanContext) referencedContext, referenceType));
} else {
if (references.size() == 1) {
//要保证有顺序
references = new ArrayList<StarAtlasSpanReferenceRelationship>(references);
}
references.add(new StarAtlasSpanReferenceRelationship(
(StarAtlasSpanContext) referencedContext, referenceType));
}
return this;
}
@Override
public SpanBuilder ignoreActiveSpan() {
throw new UnsupportedOperationException("unsupport ignore active span right now");
}
@Override
public SpanBuilder withTag(String key, String value) {
this.tags.put(key, value);
return this;
}
@Override
public SpanBuilder withTag(String key, boolean value) {
this.tags.put(key, value);
return this;
}
@Override
public SpanBuilder withTag(String key, Number value) {
this.tags.put(key, value);
return this;
}
@Override
public SpanBuilder withStartTimestamp(long startTime) {
this.startTime = startTime;
return this;
}
@Override
public Scope startActive(boolean finishOnClose) {
Span span = this.start();
return StarAtlasTracer.this.scopeManager().activate(span, finishOnClose);
}
@Override
public Span startManual() {
return null;
}
@Override
public Span start() {
StarAtlasSpanContext spanContext = null;
if(this.references.size() > 0){
// there is a parent context
spanContext = createChildContext();
}else if (!this.ignoreActiveSpan
&& StarAtlasTracer.this.scopeManager().active() != null){
// use the current span as default parent;
Scope currentScope = StarAtlasTracer.this.scopeManager().active();
this.asChildOf(currentScope.span());
spanContext = createChildContext();
}else {
// it should be the root
spanContext = createRootSpanContext();
}
long begin = this.startTime > 0 ? this.startTime : System.currentTimeMillis();
StarAtlasSpan span = new StarAtlasSpan(StarAtlasTracer.this, begin,
this.references, this.operationName, spanContext, this.tags);
return span;
}
private StarAtlasSpanContext createRootSpanContext(){
String traceId = TraceIdGenerator.generate();
return new StarAtlasSpanContext(traceId, ROOT_SPAN_ID, StringUtils.EMPTY_STRING);
}
private StarAtlasSpanContext createChildContext() {
StarAtlasSpanContext preferredReference = preferredReference();
StarAtlasSpanContext sofaTracerSpanContext = new StarAtlasSpanContext(
preferredReference.getTraceId(), preferredReference.nextChildContextId(),
preferredReference.getSpanId(), preferredReference.isSampled());
return sofaTracerSpanContext;
}
/**
* choose the preferred reference
* @return
*/
private StarAtlasSpanContext preferredReference() {
StarAtlasSpanReferenceRelationship preferredReference = references.get(0);
for (StarAtlasSpanReferenceRelationship reference : references) {
// childOf takes precedence as a preferred parent
String referencedType = reference.getReferenceType();
if (References.CHILD_OF.equals(referencedType)
&& !References.CHILD_OF.equals(preferredReference.getReferenceType())) {
preferredReference = reference;
break;
}
}
return preferredReference.getSpanContext();
}
}
}
这里借鉴了一些sofa-tracer里面的实现。主要逻辑就是实现了SpanBuilder来完成创建Span的逻辑,并且提供了激活span的接口。
测试
完成了这些功能之后,我们可以编写下列单元测试代码来进行测试:
import io.opentracing.Scope;
import io.opentracing.Span;
import org.junit.Assert;
import org.junit.Test;
/**
* StarAtlasTracerTest
*
*/
public class StarAtlasTracerTest {
/**
* 测试仅生成root
*/
@Test
public void generateRoot(){
StarAtlasTracer starAtlasTracer = new StarAtlasTracer();
Span root = starAtlasTracer.buildSpan("root").start();
Assert.assertNotNull(root);
StarAtlasSpanContext context = (StarAtlasSpanContext) root.context();
Assert.assertEquals(context.getSpanId(), "0");
Assert.assertEquals(context.getParentId(), "");
Assert.assertFalse(StringUtils.isBlank(context.getTraceId()));
Assert.assertNull(starAtlasTracer.scopeManager().active());
}
/**
* 测试生成root并activate
*/
@Test
public void generateRootAndActivate(){
StarAtlasTracer starAtlasTracer = new StarAtlasTracer();
Scope rootScope = starAtlasTracer.buildSpan("root").startActive(true);
Assert.assertNotNull(rootScope);
StarAtlasSpanContext context = (StarAtlasSpanContext) rootScope.span().context();
Assert.assertEquals(context.getSpanId(), "0");
Assert.assertEquals(context.getParentId(), "");
Assert.assertNotNull(starAtlasTracer.scopeManager().active());
Assert.assertEquals(rootScope, starAtlasTracer.scopeManager().active());
rootScope.close();
Assert.assertNull(starAtlasTracer.scopeManager().active());
}
/**
* 测试生成child并activate
*/
@Test
public void generateChildAndActivate(){
StarAtlasTracer starAtlasTracer = new StarAtlasTracer();
Scope rootScope = starAtlasTracer.buildSpan("root").startActive(true);
StarAtlasSpanContext rootContext = (StarAtlasSpanContext) rootScope.span().context();
Assert.assertNotNull(rootScope);
Span child = starAtlasTracer.buildSpan("child").asChildOf(rootScope.span()).start();
StarAtlasSpanContext context = (StarAtlasSpanContext)child.context();
Assert.assertEquals(context.getSpanId(), "0.1");
Assert.assertEquals(context.getTraceId(), rootContext.getTraceId());
Assert.assertEquals(rootScope, starAtlasTracer.scopeManager().active());
Scope childScope = starAtlasTracer.scopeManager().activate(child, true);
Assert.assertEquals(childScope, starAtlasTracer.scopeManager().active());
childScope.close();
Assert.assertEquals(rootScope, starAtlasTracer.scopeManager().active());
rootScope.close();
}
/**
* 测试重复激活span
*/
@Test
public void testDuplicatedActivate(){
StarAtlasTracer starAtlasTracer = new StarAtlasTracer();
Span root = starAtlasTracer.buildSpan("root").start();
Scope rootScope = starAtlasTracer.scopeManager().activate(root, true);
Span child = starAtlasTracer.buildSpan("child").start();
Scope childScope = starAtlasTracer.scopeManager().activate(child, true);
try{
starAtlasTracer.scopeManager().activate(root, true);
} catch (Exception e){
System.out.println(e.getMessage());
Assert.assertTrue(e instanceof IllegalStateException);
}
childScope.close();
rootScope.close();
}
}
具体测试场景在注释中都有,有兴趣的同学可以自行泡一下。
后记
本篇文章讲解了一下opentracing中的基本概念,并提供了一个基本的实现和测试。后续有时间和精力的情况下有可能会有后续文章讨论一下如何介入dubbo/http等场景。有问题的同学可以通过评论来讨论。
网友评论