SegmentTrace
包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
KafkaFetcher
-> TraceSegmentHandler#handle
->SegmentParserServiceImpl#send
-> TraceAnalyzer#doAnalysis
-> AnalysisListener#parsexxx
-> AnalysisListener#build
->SourceReceiver#receive
-> dispatcherManager#forward
-> XXXDispatcher#dispatch
Kafka pull数据处理
TraceSegmentHandler#handle
中调用SegmentParserServiceImpl#send(segment)
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
doAnalysis
解析segment
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();//创建监听器
notifySegmentListener(segmentObject);//处理trace
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
notifyListenerToBuild();
}
notifyEntryListener
中调用MultiScopesAnalysisListener#parseEntry
,把上下游的链路信息完善到sourceBuilder里,并添加到entrySourceBuilders中,在build
环节,进一步构建成各维度的souce数据,包括链路的trace,以及调用统计如调用次数,pxx,响应时长等metric信息都在这个环节创建。
...
sourceReceiver.receive(entrySourceBuilder.toAll());
sourceReceiver.receive(entrySourceBuilder.toService());
sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
sourceReceiver.receive(entrySourceBuilder.toEndpoint());
sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
sourceReceiver.receive(endpointRelation);
...
从上边MultiScopesAnalysisListener#build
代码片段中可以看到包含了Service
、ServiceInstance
、Endpoint
、ServiceRelation
、ServiceInstanceRelation
、EndpointRelation
这些类型的Source
;并将这些Source
提交给sourceReceiver
,其底层封装的DispatcherManager
会根据 Source
的类型选择相应的SourceDispatcher
,通过方法dispatch
进一步处理。
具体的SourceDispatcher类是哪一个呢?
public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
@Override
public void dispatch(EndpointRelation source) {
switch (source.getDetectPoint()) {
case SERVER:
serverSide(source);
break;
}
}
从这个类EndpointCallRelationDispatcher#dispatch
的参数可以看出,这个类负责EndpointRelation
这种类型的Source。查看dispatch的实现有以下这些:
BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
从以上清单中不难发现还缺少好多Source以及对应的Dispatcher类型;这些缺失的类,在Skywalking中是通过OAL机制在OAP启动时动态生成,OAL脚本位于/config文件夹中,用户只需更改并重新启动服务器即可使其生效。但是,OAL脚本还是编译语言,OAL运行时会动态生成Java代码。
可以在系统环境中添加SW_OAL_ENGINE_DEBUG=Y打开开关,以查看生成了哪些类,在oal-rt目录下的dispatcher 和 metrics两个目录查看
image.png image.png
这些生成的Metric的主要SCOPE为
All
,Service
,ServiceInstance
,Endpoint
,ServiceRelation
,ServiceInstanceRelation
,EndpointRelation
。此外,还有一些辅助SCOPE。查看官网的SCOPE定义,可以找到所有现有的SCOPE和字段
Source
类的scope方法指定了SourceDispatcher的一个数字标识,
public abstract class Source {
public abstract int scope();
最终这些Source会在SourceDispatcher的dispatch中,转换成StorageData,并交由MetricsStreamProcessor#in 进入L1、L2的聚合处理,报警处理,导出处理。
L1聚合
创建Worker,并构建worker链路:
- 启动扫描Stream注解的时候,在StreamAnnotationListener#notify中,通过MetricsStreamProcessor#create方法为每种Metrics生成一个MetricsAggregateWorker(当前实例内L1聚合),创建并注册一个这种Metric类型的远程Worker服务MetricsPersistentWorker(给其他实例的数据做L2聚合和报警、存储)
- 创建MetricsRemoteWorker并指定为MetricsAggregateWorker(L1聚合)的nextWorker,当完成L1聚合后将通过MetricsRemoteWorker当前的数据传递给远程的Worker服务MetricsPersistentWorker用于L2处理
-
数据在worker链路的流传的逻辑为:MetricsAggregateWorker(本实例做L1) -> MetricsRemoteWorke(本实例传递给远程MetricsPersistentWorker) -> MetricsPersistentWorker(远程实例,完成L2处理)->min级数据存储/更新->执行Hour聚合处理->执行day聚合处理->提交给AlarmWorker->提交给ExportWorker
image.png
- MetricsAggregateWorker的一些实现细节: 接收到Metrics数据后,放入dataCarrier(10000*2)中,然后有一个线程去消费处理Metric,将metric丢入MergableBufferedData中执行初次的聚合,MergableBufferedData中是一个map,遇到id相同的则执行聚合
public void accept(final METRICS data) {
final String id = data.id();
final METRICS existed = buffer.get(id);
if (existed == null) {
buffer.put(id, data);
} else {
final boolean isAbandoned = !existed.combine(data);
if (isAbandoned) {
buffer.remove(id);
}
}
}
MetricsPersistentWorker完成L2聚合
MetricsPersistentWorker内部使用了读写buffer缓冲,且buffer是可聚合的
即处理数据的时候,是:
- 丢入写buffer,这个写buffer在接收数据的时候具有聚合的作用
- 定时任务读buffer,这时候交换buffer的读写标识,把之前已写入数据的buffer变成读buffer,将数据读出来,进行下一步的处理。
MetricsRemoteWorker对应的远程服务是MetricsPersistentWorker,其内部有这三个很重要的worker,从其名字基本就可知道这些worker完成什么任务。
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker);
在通过in方法查看到数据写入buffer后,读buffer未完成后续操作的逻辑稍微绕一些,是在PersistenceTimer#start 中开启一个定时任务定时读数据进行处理,间隔是persistentPeriod(默认是3秒)
public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
//取出一批
final List<INPUT> dataList = getCache().read();
//预处理
prepareBatch(dataList, prepareRequests);
}
prepareBatch中是最核心的逻辑:
- 在prepareBatch中遍历Metrics
- 每个metric记录都要交给transWorker做处理
- 当已处理的数据满2000条的时候写ES
- 当当前批次全部处理完的时候写ES
- 写ES的时候,如果记录已存在,则先聚合老数据再更新
- 写ES完成后,尝试将数据交给nextAlarmWorker和nextExportWorker。
网友评论