美文网首页
17、skywalking的OAP-通过SegmentTrace

17、skywalking的OAP-通过SegmentTrace

作者: rock_fish | 来源:发表于2021-06-21 18:42 被阅读0次
    SegmentTrace

    包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
    KafkaFetcher -> TraceSegmentHandler#handle ->SegmentParserServiceImpl#send -> TraceAnalyzer#doAnalysis -> AnalysisListener#parsexxx -> AnalysisListener#build ->SourceReceiver#receive -> dispatcherManager#forward-> XXXDispatcher#dispatch

    Kafka pull数据处理

    TraceSegmentHandler#handle 中调用SegmentParserServiceImpl#send(segment)

    public void send(SegmentObject segment) {
            final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
            traceAnalyzer.doAnalysis(segment);
        }
    

    doAnalysis解析segment

    public void doAnalysis(SegmentObject segmentObject) {
            if (segmentObject.getSpansList().size() == 0) {
                return;
            }
    
            createSpanListeners();//创建监听器
    
            notifySegmentListener(segmentObject);//处理trace
    
            segmentObject.getSpansList().forEach(spanObject -> {
                if (spanObject.getSpanId() == 0) {
                    notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
                }
    
                if (SpanType.Exit.equals(spanObject.getSpanType())) {
                    notifyExitListener(spanObject, segmentObject);
                } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                    notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
                } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                    notifyLocalListener(spanObject, segmentObject);
                } else {
                    log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                              .name());
                }
            });
    
            notifyListenerToBuild();
        }
    

    notifyEntryListener中调用MultiScopesAnalysisListener#parseEntry,把上下游的链路信息完善到sourceBuilder里,并添加到entrySourceBuilders中,在build环节,进一步构建成各维度的souce数据,包括链路的trace,以及调用统计如调用次数,pxx,响应时长等metric信息都在这个环节创建。

    ...
    sourceReceiver.receive(entrySourceBuilder.toAll());
    sourceReceiver.receive(entrySourceBuilder.toService());
    sourceReceiver.receive(entrySourceBuilder.toServiceInstance());
    sourceReceiver.receive(entrySourceBuilder.toEndpoint());
    sourceReceiver.receive(entrySourceBuilder.toServiceRelation());
    sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());
    EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();
    sourceReceiver.receive(endpointRelation);
    ...
    

    从上边MultiScopesAnalysisListener#build代码片段中可以看到包含了ServiceServiceInstanceEndpointServiceRelationServiceInstanceRelationEndpointRelation这些类型的Source;并将这些Source提交给sourceReceiver,其底层封装的DispatcherManager会根据 Source的类型选择相应的SourceDispatcher,通过方法dispatch进一步处理。

    具体的SourceDispatcher类是哪一个呢?

    public class EndpointCallRelationDispatcher implements SourceDispatcher<EndpointRelation> {
    
        @Override
        public void dispatch(EndpointRelation source) {
            switch (source.getDetectPoint()) {
                case SERVER:
                    serverSide(source);
                    break;
            }
        }
    

    从这个类EndpointCallRelationDispatcher#dispatch的参数可以看出,这个类负责EndpointRelation这种类型的Source。查看dispatch的实现有以下这些:

    BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
    BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
    DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
    EndpointCallRelationDispatcher.dispatch(EndpointRelation)
    EndpointMetaDispatcher.dispatch(EndpointMeta)
    EndpointTrafficDispatcher.dispatch(Endpoint)
    InstanceTrafficDispatcher.dispatch(ServiceInstance)
    InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
    LogRecordDispatcher.dispatch(Log)
    NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
    SegmentDispatcher.dispatch(Segment)
    ServiceCallRelationDispatcher.dispatch(ServiceRelation)
    ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
    ServiceMetaDispatcher.dispatch(ServiceMeta)
    ServiceTrafficDispatcher.dispatch(Service)
    ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
    

    从以上清单中不难发现还缺少好多Source以及对应的Dispatcher类型;这些缺失的类,在Skywalking中是通过OAL机制在OAP启动时动态生成,OAL脚本位于/config文件夹中,用户只需更改并重新启动服务器即可使其生效。但是,OAL脚本还是编译语言,OAL运行时会动态生成Java代码。

    可以在系统环境中添加SW_OAL_ENGINE_DEBUG=Y打开开关,以查看生成了哪些类,在oal-rt目录下的dispatcher 和 metrics两个目录查看


    image.png image.png
    这些生成的Metric的主要SCOPEAllServiceServiceInstanceEndpointServiceRelationServiceInstanceRelationEndpointRelation。此外,还有一些辅助SCOPE。查看官网的SCOPE定义,可以找到所有现有的SCOPE和字段

    Source类的scope方法指定了SourceDispatcher的一个数字标识,

    public abstract class Source {
        public abstract int scope();
    

    最终这些Source会在SourceDispatcher的dispatch中,转换成StorageData,并交由MetricsStreamProcessor#in 进入L1、L2的聚合处理,报警处理,导出处理。

    L1聚合

    创建Worker,并构建worker链路:

    1. 启动扫描Stream注解的时候,在StreamAnnotationListener#notify中,通过MetricsStreamProcessor#create方法为每种Metrics生成一个MetricsAggregateWorker(当前实例内L1聚合),创建并注册一个这种Metric类型的远程Worker服务MetricsPersistentWorker(给其他实例的数据做L2聚合和报警、存储)
    2. 创建MetricsRemoteWorker并指定为MetricsAggregateWorker(L1聚合)的nextWorker,当完成L1聚合后将通过MetricsRemoteWorker当前的数据传递给远程的Worker服务MetricsPersistentWorker用于L2处理
    3. 数据在worker链路的流传的逻辑为:MetricsAggregateWorker(本实例做L1) -> MetricsRemoteWorke(本实例传递给远程MetricsPersistentWorker) -> MetricsPersistentWorker(远程实例,完成L2处理)->min级数据存储/更新->执行Hour聚合处理->执行day聚合处理->提交给AlarmWorker->提交给ExportWorker


      image.png
    1. MetricsAggregateWorker的一些实现细节: 接收到Metrics数据后,放入dataCarrier(10000*2)中,然后有一个线程去消费处理Metric,将metric丢入MergableBufferedData中执行初次的聚合,MergableBufferedData中是一个map,遇到id相同的则执行聚合
    public void accept(final METRICS data) {
            final String id = data.id();
            final METRICS existed = buffer.get(id);
            if (existed == null) {
                buffer.put(id, data);
            } else {
                final boolean isAbandoned = !existed.combine(data);
                if (isAbandoned) {
                    buffer.remove(id);
                }
            }
        }
    
    MetricsPersistentWorker完成L2聚合

    MetricsPersistentWorker内部使用了读写buffer缓冲,且buffer是可聚合的
    即处理数据的时候,是:

    1. 丢入写buffer,这个写buffer在接收数据的时候具有聚合的作用
    2. 定时任务读buffer,这时候交换buffer的读写标识,把之前已写入数据的buffer变成读buffer,将数据读出来,进行下一步的处理。

    MetricsRemoteWorker对应的远程服务是MetricsPersistentWorker,其内部有这三个很重要的worker,从其名字基本就可知道这些worker完成什么任务。

    this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
    this.nextExportWorker = Optional.ofNullable(nextExportWorker);
    this.transWorker = Optional.ofNullable(transWorker);
    

    在通过in方法查看到数据写入buffer后,读buffer未完成后续操作的逻辑稍微绕一些,是在PersistenceTimer#start 中开启一个定时任务定时读数据进行处理,间隔是persistentPeriod(默认是3秒)

       public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
            //取出一批
            final List<INPUT> dataList = getCache().read();
            //预处理
            prepareBatch(dataList, prepareRequests);
        }
    

    prepareBatch中是最核心的逻辑:

    1. 在prepareBatch中遍历Metrics
    2. 每个metric记录都要交给transWorker做处理
    3. 当已处理的数据满2000条的时候写ES
    4. 当当前批次全部处理完的时候写ES
    5. 写ES的时候,如果记录已存在,则先聚合老数据再更新
    6. 写ES完成后,尝试将数据交给nextAlarmWorker和nextExportWorker。

    相关文章

      网友评论

          本文标题:17、skywalking的OAP-通过SegmentTrace

          本文链接:https://www.haomeiwen.com/subject/tnperltx.html