思维导图
思维导图.png分布式链路追踪skywalking系列
启动流程
入口.png- 最终到OAPServerBootstrap类,去除非核心代码后
public static void start() {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
// 1. 读取配置
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 2. 初始化给定的模块
manager.init(applicationConfiguration);
manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
}
- 核心分为2步
- 读取配置: 加载配置文件application.yml 配置文件分位三层,module、provider、property,覆盖默认配置文件,在环境变量中获取,只覆盖存在的
public ApplicationConfiguration load() throws ConfigFileNotFoundException {
ApplicationConfiguration configuration = new ApplicationConfiguration();
//加载配置文件application.yml 配置文件分位三层,module、provider、property
this.loadConfig(configuration);
//覆盖默认配置文件,在环境变量中获取,只覆盖存在的(不能新加)。
this.overrideConfigBySystemEnv(configuration);
return configuration;
}
- 初始化给定的模块
public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
// 1.读取配置中的模块名称
String[] moduleNames = applicationConfiguration.moduleList();
// 通过SPI加载各个模块 ModuleDefine,ModuleProvider模块
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
HashSet<String> moduleSet = new HashSet<>(Arrays.asList(moduleNames));
for (ModuleDefine module : moduleServiceLoader) {
if (moduleSet.contains(module.name())) {
// 2. 调用provider模块的预处理方法,在模块抽象类中,具体的功能,初始化模块的provider.prepare
module.prepare(this, applicationConfiguration.getModuleConfiguration(module.name()), moduleProviderLoader);
loadedModules.put(module.name(), module);
moduleSet.remove(module.name());
}
}
// Finish prepare stage
isInPrepareStage = false;
if (moduleSet.size() > 0) {
throw new ModuleNotFoundException(moduleSet.toString() + " missing.");
}
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
// 3. 调用各个provide模块的的start方法
bootstrapFlow.start(this);
// 4. 调用各个provide模块的的notify方法
bootstrapFlow.notifyAfterCompleted();
}
2.1 读取配置中的模块名称:读取从1那里的配置模块
2.2 调用provider模块的预处理方法: ModuleDefine -> ModuleProvider,可插拔式的模块启动可选择性大,集群启动Provider就有etcd, nacos等,配置读取模块就有apollo等可供选择,apollo的配置读取会在prepare时就读取了
Provider实现类.png
2.3 调用各个provide模块的的start方法:比如存储Provider使用es时StorageModuleElasticsearch7Provider,prepare方法会用一些注册信息,并new HighClient客户端,在start时才连接。KafkaFetcherProvider也跟es7存储类似
2.4 调用各个provide模块的的notify方法,起到后置处理作用
服务注册发现模块
- 可配置使用etcd, nacos等服务注册发现,如果使用nacos的话对应ClusterModuleNacosProvider这个类,prepare方法处理连接之类的,start和notify方法为空
KafkaFetcherProvider模块
- 可以改变发送数据的方式,比如将客户端默认的grpc方式改成kafka发送就需要用这个模块,prepare方法会用一些注册信息,比如new consumer,在start时才启动消费
存储模块
- 可根据配置选择存储介质,如果用es7作为数据存储介质的话,存在模块会调用StorageModuleElasticsearch7Provider这个Provider模块。prepare方法会用一些注册信息,并new HighClient客户端,在start时才连接
配置模块
- 可配置使用apollo, nacos, 自研等配置服务
DataTTLKeeperTimer
- 数据清理,会使用到服务注册发现, oap服务注册发现还有用来广播的功能,比如查询服务列表然后广播到其他服务器
- Metrics、Trace 等(时间相关的数据)对应的 ES 索引都是按照时间进行切分的,随着时间的推移,ES 索引会越来越多。为了解决这个问题,SkyWalking 只会在 ES 中存储一段时间内的数据,CoreModuleProvider 会启动 DataTTLKeeperTimer 定时清理过期数据
OAP数据接收
- 启动时CoreModuleProvider核心provider会初始化GRPCServer, 只留下GRPCServer相关的核心代码
# CoreModuleProvider
public void prepare() {
if (moduleConfig.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath()
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}
if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();
}
- GRPCServer 用于接收 SkyWalking Agent 发送的 gRPC 请求,SkyWalking Agent 会切分OAP服务列表配置项,得到 OAP 服务列表,然后从其中随机选择一个 OAP 服务创建长连接,实现后续的数据上报
-
GRPCServer 处理 gRPC 请求的逻辑,封装在了 ServerHandler 实现之中的。我们可以通过两者的 addHandler() 方法,为指定请求添加相应的 ServerHandler 实现, 比如常见的TraceSegmentReportServiceHandler上报trace数据,就是利用对应的handler处理的
handler处理时限类.png
默认GRPC接收
TraceSegmentReportServiceHandler接收的agent的方法,只留核心的代码
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
return new StreamObserver<SegmentObject>() {
@Override
public void onNext(SegmentObject segment) {
segmentParserService.send(segment);
}
}
}
- SegmentParserServiceImpl中send方法
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
- TraceAnalyzer中doAnalysis方法: 各个listen是构建存储model,notifyListenerToBuild才触发存储,比如es
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
// 创建span的listener
createSpanListeners();
// 通知segment监听,构建存储model
notifySegmentListener(segmentObject);
// 通知 first exit local的各个listen,构建存储model
segmentObject.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
// 触发存储动作
notifyListenerToBuild();
}
private void notifyListenerToBuild() {
analysisListeners.forEach(AnalysisListener::build);
}
- SegmentAnalysisListener.build方法存储
// 去掉非核心代码
public void build() {
segment.setEndpointId(endpointId);
segment.setEndpointName(endpointName);
// 存储核心
sourceReceiver.receive(segment);
}
- SourceReceiverImpl.receive方法
public void receive(ISource source) {
dispatcherManager.forward(source);
}
- DispatcherManager.forward方法,只留下核心代码
public void forward(ISource source) {
source.prepare();
for (SourceDispatcher dispatcher : dispatchers) {
dispatcher.dispatch(source);
}
}
- SegmentDispatcher.dispatch方法,存储数据,比如es
public void dispatch(Segment source) {
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
segment.setServiceId(source.getServiceId());
segment.setServiceInstanceId(source.getServiceInstanceId());
segment.setEndpointName(source.getEndpointName());
segment.setEndpointId(source.getEndpointId());
segment.setStartTime(source.getStartTime());
segment.setEndTime(source.getEndTime());
segment.setLatency(source.getLatency());
segment.setIsError(source.getIsError());
segment.setDataBinary(source.getDataBinary());
segment.setTimeBucket(source.getTimeBucket());
segment.setVersion(source.getVersion());
segment.setTagsRawData(source.getTags());
segment.setTags(Tag.Util.toStringList(source.getTags()));
RecordStreamProcessor.getInstance().in(segment);
}
- 最终会调RecordPersistentWorker.in存储
public void in(Record record) {
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
batchDAO.asynchronous(insertRequest);
}
- 最后到es存储BatchProcessEsDAO, 调es的bulk
public void asynchronous(InsertRequest insertRequest) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
this.bulkProcessor.add((IndexRequest) insertRequest);
}
agent调用ExitSpan, EntrySpan, LocalSpan
- trace id的生成类似雪花算法
-
TraceSegmen: TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span, TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace
TraceSegmen.png - EntrySpan:当请求进入服务时会创建 EntrySpan 类型的 Span,它也是 TraceSegment 中的第一个 Span。例如,HTTP 服务、RPC 服务、MQ-Consumer 等入口服务的插件在接收到请求时都会创建相应的 EntrySpan
- LocalSpan:它是在本地方法调用时可能创建的 Span 类型,在后面介绍 @Trace 注解的时候我们还会看到 LocalSpan
- ExitSpan:当请求离开当前服务、进入其他服务时会创建 ExitSpan 类型的 Span。例如, Http Client 、RPC Client 发起远程调用或是 MQ-producer 生产消息时,都会产生该类型的 Span
例子
- 浏览器发起/test请求时,agent会向oap发起两个trace数据,421c2c8fe62d4e42a197a4914ec6d8d4.58.16449874961510000中421c2c8fe62d4e42a197a4914ec6d8d4表示uuid,同一个请求不同线程uuid是一样的,58表示线程,16449874961510000自增id
@GetMapping("/test")
public String test() throws InterruptedException {
// 访问百度
doGetTestOne();
// 访问简书
doGetTestTwo();
// 访问csdn
new Thread(() -> doGetTestThree()).start();
Thread.sleep(30000);
return "test";
}
-
可以看到访问csdn的线程请求只有ExitSpan,EntrySpan在主线程发送的trace数据请求中
另起线程访问csdn.png -
两个ExitSpan是访问百度和简书的,一个EntrySpan是tomcat的
主线程访问百度和简书.png - TraceSegmentServiceClient.consume会消费上述提到的List<TraceSegment> 转换成SegmentObject发给oap服务端。TraceSegmentServiceClient初始化是在agent初始化的BootService中初始化的,会消费DataCarrier数据
学习
各个框架服务注册发现处理
kafka
- 配置nginx域名(也可直接dns映射),客户端负载请求服务端, fetch服务端信息,更久topic-partation,客户端选择连接某台服务器,所有服务器都有全部服务器的元数据信息
es
- 配置nginx,随便连接到哪里,查询时根据文档id路由到某台,所有服务器都有全部服务器的元数据信息,根据信息决定转发路由到哪台
apollo
- 配置nginx域名(也可直接dns映射),负载去meta获取服务元数据信息,meta是通过eurka实现服务注册发现
skywalking
- 如果是kafka实现则通kafka
- grpc实现,则可以配置nginx,负载到kong网关,网关层用k8s的服务注册发现
分布式链路追踪选型
Zipkin
- 由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现
- zipkin的链路监控粒度相对没有那么细,调用链中具体到接口级别,再进一步的调用信息并未涉及
Pinpoint
- 一款对Java编写的大规模分布式系统的APM工具,由韩国人开源的分布式跟踪组件
- pinpoint应该是这三种APM组件中,数据分析最为完备的组件
Skywalking
- 国产的优秀APM组件,是一个对JAVA分布式应用程序集群的业务运行情况进行追踪、告警和分析的系统
- skywalking 还支持20+的中间件、框架、类库,比如:主流的dubbo、Okhttp,还有DB和消息中间件, skywalking链路调用分析比zipkin完备些
对比
- Zipkin 使用修改过的类库和它自己的容器(Finagle)来提供分布式事务跟踪的功能。但是,它要求在需要时修改代码。skywalking和pinpoint都是基于字节码增强的方式
- Pinpoint 的后端存储基于 HBase,而 Zipkin 基于 Cassandra
- Pinpoint的不足:
- 不支持异步执行的调用链追踪(比如多线程、MQ),而SW通过注解可以支持。
- 功能比较少,例如缺少平均响应、平均吞吐量等数据,缺少慢服务的统计。
- 调用链信息,可以扩展和丰富的程度,要低于SW
网友评论