- Kafka/Grpc的 Handler接收数据,解析接收到的数据构建成
Source
,不同的数据类型,这个过程复杂程度不同。 -
DispatcherManager
找到Source
对应的SourceDispatcher
,将Source
传入dispatch
方法中做处理
2.1 将Source
转换成对应的StorageData
的子类,比如Segment
(Source) 转换成SegmentRecord
(StorageData)
2.2 调用具体的StreamProcessor
处理StorageData
流程图:https://www.processon.com/view/link/60c1e5f20791297a3f0e4411
Kafka接收数据
大致的流程为:Handler->解析构建Source-> SourceDispatcher#dispatch
有哪些SourceDisptch呢,他们都处理什么类型的数据呢,这里是通过扫描的方式来实现的,其大致过程如下:
CoreModuleProvider#start
会调用SourceReceiverImpl.scan()
,再DispatcherManager#scan
扫描接口SourceDispatcher
的实现类,并根据实现类的dispatch
方法的参数确定其处理的Source
的类型;比如SourceDispatcher#dispatch
处理的Source
的类型是Segment
:
public class SegmentDispatcher implements SourceDispatcher<Segment> {
@Override
public void dispatch(Segment source) {
...
}
这些被扫描到的实例,会被存储到DispatcherManager
的成员变量dispatcherMap
中,其key是Source#scope()
,value是具体的SourceDispatcher
实现类的实例。在使用的时候通过Source#scope()
在dispatcherMap
中检索到Source对应的xxxDispatcher实例,进而调用其dispatch方法
需要知晓SourceDispatcher
的部分实现是通过 OAL 脚本生成的,我们在源码中看不到
BrowserAppTrafficSourceDispatcher.dispatch(SOURCE)
BrowserErrorLogRecordDispatcher.dispatch(BrowserErrorLog)
DatabaseStatementDispatcher.dispatch(DatabaseSlowStatement)
EndpointCallRelationDispatcher.dispatch(EndpointRelation)
EndpointMetaDispatcher.dispatch(EndpointMeta)
EndpointTrafficDispatcher.dispatch(Endpoint)
InstanceTrafficDispatcher.dispatch(ServiceInstance)
InstanceUpdateDispatcher.dispatch(ServiceInstanceUpdate)
LogRecordDispatcher.dispatch(Log)
NetworkAddressAliasSetupDispatcher.dispatch(NetworkAddressAliasSetup)
SegmentDispatcher.dispatch(Segment)
ServiceCallRelationDispatcher.dispatch(ServiceRelation)
ServiceInstanceCallRelationDispatcher.dispatch(ServiceInstanceRelation)
ServiceMetaDispatcher.dispatch(ServiceMeta)
ServiceTrafficDispatcher.dispatch(Service)
ZipkinSpanRecordDispatcher.dispatch(ZipkinSpan)
存储
启动时扫描@Stream标注,这个注解标注在StorageData上,看其核心成员:
- name : 这个StorageData在es中对应的索引的名称 ,或者别名
- processor : 构建数据存储处理的链路,接收数据,调用worker,进而执行存储。
- builder :写时:StorageData->map->IndexRequest / 读时:IndexResponse->map->StorageData
例如:
@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class LogRecord extends AbstractLogRecord {
扫描@Stream,对其解析后能会构造StreamProcessor,调用其create方法 做一些数据存储层的初始化工作:
- 创建model,model 的关键作用:创建索引和索引模板
- 创建dao,dao是数据访问层
- 创建 StorageBuilder ,StorageBuilder是数据转换,存储中间件的对象与上层StorageData之间在读写操作时的互相转换。
- 创建worker链,数据处理逻辑,这里会调用dao ,执行存储。
这些内容的工作链路大致如下:
0、OAP启动时,如果需要创建索引和模板,就通过Model执行创建。
1、Kafka或Grpc接收到收据后,会解析原始数据,构造成 各种StorageData交给 RecordStreamProcessor#in
2、遍历workder链,寻找对应StorageData的worker ,调用其in方法继续处理
3、通常在in方法中调用dao执行存储
4、dao中调用StorageBuilder的转换方法,将StorageData转换成map,进而拼装成es 的IndexRequest对象。
网友评论