关于Agent端的上报和OAP的采集,Skywalking技术比较成熟,重点关注插件在拦截器中如何进行上报
ContextManager
插件进行数据上报(上传span),主要是通过ContextManager
提供的API完成的,重点方法如下
在beforeMethod
中常用的方法:
-
ContextManager#createEntrySpan
创建入口span -
ContextManager#createLocalSpan
创建本地span -
ContextManager#createExitSpan
创建出口span -
Tags#URL#set
设置span的url属性 -
span#setComponent
设置span的component属性 -
SpanLayer#asRPCFramework
标记span为RPC -
SpanLayer#asHttp
标记span为http - 等等
注意:beforeMethod
调用的各种span的create方法并没有实际的进行数据上报,只是暂存在ThreadLocal中
实际触发上报一般是在afterMethod
中,常用的方法
ContextManager#stopSpan
比较直观,即在方法结束时标记span结束,重点是该方法内部会调用TracingContext#finish
方法,这个方法会实际触发数据上报
以Dubbo插件为例,看一下插件对上报API的使用
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
// dubbo的rpcContext,使用它可以通过网络传递附件,这里就是传递上下文载体
RpcContext rpcContext = RpcContext.getContext();
if (isConsumer) {// 消费方
final ContextCarrier contextCarrier = new ContextCarrier();
// 调用createExitSpan创建出口span
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
// 使用附件传输上下文载体
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
}
} else { // 服务方
ContextCarrier contextCarrier = new ContextCarrier();
// 跟据附件获取上下文载体
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
// 调用createEntrySpan创建入口span
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
span.setPeer(rpcContext.getRemoteAddressString()); // span设置peer属性
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation)); // 设置url属性
collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); // 收集参数
span.setComponent(ComponentsDefine.DUBBO); // 设置component属性
SpanLayer.asRPCFramework(span); // 设置layer属性
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
// 标记span结束
ContextManager.stopSpan();
return ret;
}
数据上报
finish
上文提到ContextManager#stopSpan
方法会触发数据上报,因为内部会调用TracingContext#finish
方法:
private void finish() {
// 正在运行的span是否为空,即segment下的span全部完成
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) { // 如果已完成
// 发布segment完成通知
TracingThreadListenerManager.notifyFinish(this);
}
}
可以看到span在stop之后并不一定会上报,而是整个segment下的span全部完成才触发上报,并且使用发布监听模式通知消费者处理,其中消费者就是TraceSegmentServiceClient
TraceSegmentServiceClient
TraceSegmentServiceClient接收到segment结束信号后,同样不是直接建立网络连接上报,而是将消息发给了skywalking内部的消息队列
@Override
public void afterFinished(TraceSegment traceSegment) {
if (!carrier.produce(traceSegment)) { // 发布到carrier消息队列
...
}
}
Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent写入内存消息队列,后台线程【异步】发送给 Collector
这个消息队列就是skywalking自实现的:DataCarrier
同时TraceSegmentServiceClient
本身即是消息的发布者,又是消费者
// TraceSegmentServiceClient的初始化方法
public void boot() {
...
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 创建消息队列
carrier.consume(this, 1); // 消费者就是this, 即TraceSegmentServiceClient本身
}
而消费的方法就是TraceSegmentServiceClient#consume
方法
@Override
public void consume(List<TraceSegment> data) { // 参数就是Segment,因为使用队列,可能是多个
// 使用Grpc包工具
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
...
});
try {
// 循环流式上报segment
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
// 结束
upstreamSegmentStreamObserver.onCompleted();
}
这里的upstreamSegmentStreamObserver
工具是使用grpc-stub
的grpc协议
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
使用流式数据传输的方式实现数据上报,上传的对象SegmentObject
OAP采集
OAP采集器主要在server-receiver-plugin
包中,依然是使用插件的方式应对不同数据的采集,比如
-
skywalking-trace-receiver-plugin
分布式链路采集 -
skywalking-jvm-receiver-plugin
jvm采集 - 等等大概10几个采集插件
这里主要分析分布式链路采集插件:skywalking-trace-receiver-plugin
TraceSegmentReportServiceHandler
分布式链路trace的采集主要入口TraceSegmentReportServiceHandler#collect
方法,接受流式数据对象SegmentObject
(对应了Agent端的上传对象)
@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
// 使用grpc.stub的服务端
return new StreamObserver<SegmentObject>() {
// 收到数据的处理
@Override
public void onNext(SegmentObject segment) {
try {
// 调用segmentParserService.send
segmentParserService.send(segment);
} catch (Exception e) {
...
}
}
...
};
}
具体的Segment信息处理交由segmentParserService#send
SegmentParse
Segment信息的解析器,send
方法如下
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
转交给TraceAnalyzer
分析并处理
TraceAnalyzer
其中doAnalysis
负责分析Segment对象,方法如下
// 解析SegmentO对象,基本都是订阅发布模式
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();
notifySegmentListener(segmentObject);
// 循环所有的span
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
// 解析第一个 Span
notifyFirstListener(spanObject, segmentObject);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
// 解析出口Span
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
// 解析入口Span
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
// 解析本地Span
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
// 通知 Span 监听器们,执行构建各自的数据
notifyListenerToBuild();
}
总结就是通知 Span 监听器们,去构建各自的数据,经过流式处理,最终存储到存储器。
网友评论