美文网首页
14、Skywalking的OAP-核心流程串烧

14、Skywalking的OAP-核心流程串烧

作者: rock_fish | 来源:发表于2021-06-09 15:38 被阅读0次
    1. Kafka/Grpc的 Handler接收数据,解析接收到的数据构建成Source,不同的数据类型,这个过程复杂程度不同。
    2. DispatcherManager找到Source对应的SourceDispatcher,将Source传入dispatch方法中做处理
      2.1 将Source转换成对应的StorageData的子类,比如Segment(Source) 转换成 SegmentRecord(StorageData)
      2.2 调用具体的StreamProcessor处理StorageData

    流程图:https://www.processon.com/view/link/60c1e5f20791297a3f0e4411

    image.png image.png
    Kafka接收数据

    大致的流程为:Handler->解析构建Source-> SourceDispatcher#dispatch

    有哪些SourceDisptch呢,他们都处理什么类型的数据呢,这里是通过扫描的方式来实现的,其大致过程如下:

    CoreModuleProvider#start会调用SourceReceiverImpl.scan() ,再DispatcherManager#scan扫描接口SourceDispatcher的实现类,并根据实现类的dispatch方法的参数确定其处理的Source的类型;比如SourceDispatcher#dispatch处理的Source的类型是Segment

    public class SegmentDispatcher implements SourceDispatcher<Segment> {
    
        @Override
        public void dispatch(Segment source) {
        ...
        }
    

    这些被扫描到的实例,会被存储到DispatcherManager的成员变量dispatcherMap中,其key是Source#scope(),value是具体的SourceDispatcher实现类的实例。在使用的时候通过Source#scope()dispatcherMap中检索到Source对应的xxxDispatcher实例,进而调用其dispatch方法

    需要知晓SourceDispatcher 的部分实现是通过 OAL 脚本生成的,我们在源码中看不到

    BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
    BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
    DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
    EndpointCallRelationDispatcher.dispatch(EndpointRelation)
    EndpointMetaDispatcher.dispatch(EndpointMeta)
    EndpointTrafficDispatcher.dispatch(Endpoint)
    InstanceTrafficDispatcher.dispatch(ServiceInstance)
    InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
    LogRecordDispatcher.dispatch(Log)
    NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
    SegmentDispatcher.dispatch(Segment)
    ServiceCallRelationDispatcher.dispatch(ServiceRelation)
    ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
    ServiceMetaDispatcher.dispatch(ServiceMeta)
    ServiceTrafficDispatcher.dispatch(Service)
    ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
    
    存储

    启动时扫描@Stream标注,这个注解标注在StorageData上,看其核心成员:

    • name : 这个StorageData在es中对应的索引的名称 ,或者别名
    • processor : 构建数据存储处理的链路,接收数据,调用worker,进而执行存储。
    • builder :写时:StorageData->map->IndexRequest / 读时:IndexResponse->map->StorageData

    例如:

    @Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
    public class LogRecord extends AbstractLogRecord {
    

    扫描@Stream,对其解析后能会构造StreamProcessor,调用其create方法 做一些数据存储层的初始化工作:

    • 创建model,model 的关键作用:创建索引和索引模板
    • 创建dao,dao是数据访问层
    • 创建 StorageBuilder ,StorageBuilder是数据转换,存储中间件的对象与上层StorageData之间在读写操作时的互相转换。
    • 创建worker链,数据处理逻辑,这里会调用dao ,执行存储。

    这些内容的工作链路大致如下:
    0、OAP启动时,如果需要创建索引和模板,就通过Model执行创建。

    1、Kafka或Grpc接收到收据后,会解析原始数据,构造成 各种StorageData交给 RecordStreamProcessor#in

    2、遍历workder链,寻找对应StorageData的worker ,调用其in方法继续处理

    3、通常在in方法中调用dao执行存储

    4、dao中调用StorageBuilder的转换方法,将StorageData转换成map,进而拼装成es 的IndexRequest对象。

    相关文章

      网友评论

          本文标题:14、Skywalking的OAP-核心流程串烧

          本文链接:https://www.haomeiwen.com/subject/zbkvrltx.html