美文网首页
14、Skywalking的OAP-核心流程串烧

14、Skywalking的OAP-核心流程串烧

作者: rock_fish | 来源:发表于2021-06-09 15:38 被阅读0次
  1. Kafka/Grpc的 Handler接收数据,解析接收到的数据构建成Source,不同的数据类型,这个过程复杂程度不同。
  2. DispatcherManager找到Source对应的SourceDispatcher,将Source传入dispatch方法中做处理
    2.1 将Source转换成对应的StorageData的子类,比如Segment(Source) 转换成 SegmentRecord(StorageData)
    2.2 调用具体的StreamProcessor处理StorageData

流程图:https://www.processon.com/view/link/60c1e5f20791297a3f0e4411

image.png image.png
Kafka接收数据

大致的流程为:Handler->解析构建Source-> SourceDispatcher#dispatch

有哪些SourceDisptch呢,他们都处理什么类型的数据呢,这里是通过扫描的方式来实现的,其大致过程如下:

CoreModuleProvider#start会调用SourceReceiverImpl.scan() ,再DispatcherManager#scan扫描接口SourceDispatcher的实现类,并根据实现类的dispatch方法的参数确定其处理的Source的类型;比如SourceDispatcher#dispatch处理的Source的类型是Segment

public class SegmentDispatcher implements SourceDispatcher<Segment> {

    @Override
    public void dispatch(Segment source) {
    ...
    }

这些被扫描到的实例,会被存储到DispatcherManager的成员变量dispatcherMap中,其key是Source#scope(),value是具体的SourceDispatcher实现类的实例。在使用的时候通过Source#scope()dispatcherMap中检索到Source对应的xxxDispatcher实例,进而调用其dispatch方法

需要知晓SourceDispatcher 的部分实现是通过 OAL 脚本生成的,我们在源码中看不到

BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
存储

启动时扫描@Stream标注,这个注解标注在StorageData上,看其核心成员:

  • name : 这个StorageData在es中对应的索引的名称 ,或者别名
  • processor : 构建数据存储处理的链路,接收数据,调用worker,进而执行存储。
  • builder :写时:StorageData->map->IndexRequest / 读时:IndexResponse->map->StorageData

例如:

@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class LogRecord extends AbstractLogRecord {

扫描@Stream,对其解析后能会构造StreamProcessor,调用其create方法 做一些数据存储层的初始化工作:

  • 创建model,model 的关键作用:创建索引和索引模板
  • 创建dao,dao是数据访问层
  • 创建 StorageBuilder ,StorageBuilder是数据转换,存储中间件的对象与上层StorageData之间在读写操作时的互相转换。
  • 创建worker链,数据处理逻辑,这里会调用dao ,执行存储。

这些内容的工作链路大致如下:
0、OAP启动时,如果需要创建索引和模板,就通过Model执行创建。

1、Kafka或Grpc接收到收据后,会解析原始数据,构造成 各种StorageData交给 RecordStreamProcessor#in

2、遍历workder链,寻找对应StorageData的worker ,调用其in方法继续处理

3、通常在in方法中调用dao执行存储

4、dao中调用StorageBuilder的转换方法,将StorageData转换成map,进而拼装成es 的IndexRequest对象。

相关文章

  • 14、Skywalking的OAP-核心流程串烧

    Kafka/Grpc的 Handler接收数据,解析接收到的数据构建成Source,不同的数据类型,这个过程复杂程...

  • 17、skywalking的OAP-通过SegmentTrace

    SegmentTrace 包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:KafkaFet...

  • 16、skywalking的OAP-通过SegmentTrace

    SegmentTrace的核心处理流程 包含了从Kafka初始化,接收数据、解析构建、存储;核心的源码流程如下:K...

  • Skywalking的流程

    Skywalking的流程(本文已6.5.0为例)1.将skywalking下载下来2.解压skywalking文...

  • 破界突围之路:关于职能型vs流程型组织差异的一点思考

    流程建设作为近期核心重点工作之一,最近也一直思考怎么样把流程建设的方法串一串,至少自己能把自己说通。 今天...

  • 串烧

    (南山南) 你在南方的艳阳里 大雪纷飞 我在北方的寒夜里 四季如春 如果天黑之前来得及 我要忘了你的眼睛 穷极一生...

  • 串烧

    《告白气球》-亲爱的爱上你,重那天起,甜蜜的很轻易,亲爱的,别任性,你的眼睛,在说我爱你。 《有点甜》-是你让我看...

  • 串烧

    #一只蟑螂从角落里窜了出来,他马上条件反射地抬起脚一把踩了下去…他把蟑螂的尸体连带垃圾一起扔在路边的一根电线杆旁。...

  • 串烧

    沧海一声笑, 任我去逍遥, 把酒问青天, 何时晴来了? 乘上公交车, 穿梭雨雾中, 谁人知我心, 终点站到了, 苍...

  • 3.1-机智云设备接入之多种模组串口烧写说明

    一、机智云 GAgent For HF-LPB100串口烧写说明 1、操作流程如下: 【1】下载HF-LPB100...

网友评论

      本文标题:14、Skywalking的OAP-核心流程串烧

      本文链接:https://www.haomeiwen.com/subject/zbkvrltx.html