美文网首页
16、skywalking的OAP-通过SegmentTrace

16、skywalking的OAP-通过SegmentTrace

作者: rock_fish | 来源:发表于2021-06-10 17:35 被阅读0次
    SegmentTrace的核心处理流程

    包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:
    KafkaFetcher -> TraceSegmentHandler#handle ->SegmentParserServiceImpl#send -> TraceAnalyzer#doAnalysis -> AnalysisListener#parsexxx -> AnalysisListener#build ->SourceReceiver#receive -> dispatcherManager#forward-> SegmentDispatcher#dispatch

    1. KafkaFetcherProviderstart方法中注册了TraceSegmentHandler,用于接收Trace的数据,进行处理。
    handlerRegister.register(new TraceSegmentHandler(getManager(), config));
    
    1. TraceSegmentHandler#handle处理Kafka数据
    //将Kafka数据,解析成SegmentObject
    SegmentObject segment = SegmentObject.parseFrom(record.value().get());
    //交给SegmentParserServiceImpl处理
    segmentParserService.send(segment);
    
    1. SegmentParserServiceImpl#send中通过 TraceAnalyzer 解析SegmentObject
        public void send(SegmentObject segment) {
            final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
            traceAnalyzer.doAnalysis(segment);
        }
    

    其内部功能很多,从是Segment这个数据流程来说就是:构建多个监听器,以监听器的模式来通过解析segmentObject各个属性,通过构建SourceBuilder对象来承载上下游的链路相关信息,并添加到entrySourceBuilders中;在build环节,进一步构建成各维度的souce数据,包括Trace(链路),Metrics(调用统计如调用次数,pxx,响应时长等) 信息都在这个环节创建。先大致看下其代码主体流程,接下来会分析内部更多的细节逻辑:

    public void doAnalysis(SegmentObject segmentObject) {
            if (segmentObject.getSpansList().size() == 0) {
                return;
            }
    
            createSpanListeners();//创建监听器
    
            notifySegmentListener(segmentObject);//处理trace
    
            segmentObject.getSpansList().forEach(spanObject -> {
                if (spanObject.getSpanId() == 0) {
                    notifyFirstListener(spanObject, segmentObject);//根据第一个span的信息做一些处理
                }
    
                if (SpanType.Exit.equals(spanObject.getSpanType())) {
                    notifyExitListener(spanObject, segmentObject);
                } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                    notifyEntryListener(spanObject, segmentObject);//这里有很重要的链路的metric信息构建
                } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                    notifyLocalListener(spanObject, segmentObject);
                } else {
                    log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                              .name());
                }
            });
    
            notifyListenerToBuild();
        }
    
    1. SegmentAnalysisListener#parseSegment构建Segment(Source),部分属性赋值
      1.1 赋值 起止时间
      1.2 赋值 是否error
      1.3 赋值 是否采样,这里是重点

    2. SegmentAnalysisListener#notifyFirstListener 更多的属性赋值

    3. 多个EntryAnalysisListener监听器处理Entry类型的span
      3.1 SegmentAnalysisListener#parseEntry赋值service和endpoint的Name和id

      3.2 NetworkAddressAliasMappingListener#parseEntry 构造NetworkAddressAliasSetup完善ip_port地址与别名之间的映射关心,交给NetworkAddressAliasSetupDispatcher处理

      3.3 MultiScopesAnalysisListener#parseEntry 遍历span列表

      • 3.3.1 将每个span构建成SourceBuilder,设置上下游的游的Server、Instance、endpoint的name信息,这里mq和网关特殊处理,其上游保持ip端口,因为mq、网关通常没有搭载agent,没有相关的name信息。
      • 3.3.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
      • 3.3.3 SourceBuilder添加到entrySourceBuilders,
      • 3.3.4 parseLogicEndpoints//处理span的tag是LOGIC_ENDPOINT = "x-le"类型的,添加到 logicEndpointBuilders中(用途待梳理)
    4. MultiScopesAnalysisListener#parseExit监听器处理Exit类型的span
      4.1 将span构建成SourceBuilder,设置上下游的游的ServerInstanceEndpoint的name信息,尝试把下游的ip_port信息修改成别名。
      4.2 setPublicAttrs:SourceBuilder中添加 tag信息,重点是时间bucket,setResponseCode,Status,type(http,rpc,db)
      4.3 SourceBuilder添加到exitSourceBuilders
      4.4 如果是db类型,构造slowStatementBuilder,判断时长设置慢查询标识,存入dbSlowStatementBuilders中。这里是全局的阈值 是个改造点。

    5. MultiScopesAnalysisListener#parseLocal监听器处理Local类型的span,通过parseLogicEndpoints方法处理span的tag是LOGIC_ENDPOINT = "x-le"类型的,添加到 logicEndpointBuilders中(用途待梳理)

    6. 这里是重点,以上构建的SourceBuilder就在这一步使用,执行各个AnalysisListener#build

      6.1 SegmentAnalysisListener#build,设置endpoint的 id 和name,然后将Segment交给SourceReceiver#receive处理,而SourceReceiver#receive就是调用dispatcherManager#forward,最终交给SegmentDispatcher#dispatch处理了,
      6.2 MultiScopesAnalysisListener#build中根据以上流程中创建的数据,会再构造出多种Metric 类型的Source数据交给SourceReceiver处理;这些逻辑在这篇笔记中不展开,本篇已Segment流程为主

    SegmentDispatcher 处理 Segment

    简单来说,这里的逻辑就是把Source转换成StorageData,交给RecordStreamProcessor里的一组AbstractWorker,用于完成以记录存储为主的相关工作。

    public class SegmentDispatcher implements SourceDispatcher<Segment> {
    
        @Override
        public void dispatch(Segment source) {
            //Segment(Source) 转换成 SegmentRecord(StorageData)
            SegmentRecord segment = new SegmentRecord();
            segment.setSegmentId(source.getSegmentId());
            ...
            segment.setTags(Tag.Util.toStringList(source.getTags()));
            //交给worker链路处理StorageData
            RecordStreamProcessor.getInstance().in(segment);
        }
    }
    

    具体看下RecordStreamProcessor#inRecordPersistentWorker是如何把记录存储到ES中的。RecordPersistentWorker#in中的逻辑很清晰:

    //1. 把SegmentRecord,通过RecordEsDAO的方法转换成ES的 InsertRequest
    InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
    //2. 异步的方式写es
    batchDAO.asynchronous(insertRequest);
    
    1. BatchProcessEsDAO#prepareBatchInsert方法中,通过storageBuilderSegmentRecord转换成map,再讲map构建成XContentBuilder,进而构建成ES里的InsertRequest实例
        @Override
        public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
            XContentBuilder builder = map2builder(
                IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
            String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
            String id = IndexController.INSTANCE.generateDocId(model, record.id());
            return getClient().prepareInsert(modelName, id, builder);
        }
    
    1. BatchProcessEsDAO#asynchronous
      首先初始化bulkProcessor,将InsertRequest提交给bulkProcessor,bulkProcessor是一个异步批插入的操作,细节可另行百度。

    至此Trace写到ES后,这个流程就算结束了。

    相关文章

      网友评论

          本文标题:16、skywalking的OAP-通过SegmentTrace

          本文链接:https://www.haomeiwen.com/subject/lfknrltx.html