美文网首页
分布式链路追踪skywalking(二)-oap服务篇

分布式链路追踪skywalking(二)-oap服务篇

作者: 后来丶_a24d | 来源:发表于2022-02-16 15:11 被阅读0次

    思维导图

    思维导图.png

    分布式链路追踪skywalking系列


    启动流程

    入口.png
    • 最终到OAPServerBootstrap类,去除非核心代码后
     public static void start() {
        String mode = System.getProperty("mode");
        RunningMode.setMode(mode);
        
        ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
        ModuleManager manager = new ModuleManager();
        // 1. 读取配置
        ApplicationConfiguration applicationConfiguration = configLoader.load();
        // 2. 初始化给定的模块
        manager.init(applicationConfiguration);
    
        manager.find(TelemetryModule.NAME)
               .provider()
               .getService(MetricsCreator.class)
               .createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
               // Set uptime to second
                   .setValue(System.currentTimeMillis() / 1000d);  
    }
    
    • 核心分为2步
    1. 读取配置: 加载配置文件application.yml 配置文件分位三层,module、provider、property,覆盖默认配置文件,在环境变量中获取,只覆盖存在的
    public ApplicationConfiguration load() throws ConfigFileNotFoundException {
        ApplicationConfiguration configuration = new ApplicationConfiguration();
        //加载配置文件application.yml 配置文件分位三层,module、provider、property
        this.loadConfig(configuration);
        //覆盖默认配置文件,在环境变量中获取,只覆盖存在的(不能新加)。
        this.overrideConfigBySystemEnv(configuration);
        return configuration;
    }
    
    1. 初始化给定的模块
    public void init(
        ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
        // 1.读取配置中的模块名称
        String[] moduleNames = applicationConfiguration.moduleList();
        // 通过SPI加载各个模块 ModuleDefine,ModuleProvider模块
        ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
        ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
    
        HashSet<String> moduleSet = new HashSet<>(Arrays.asList(moduleNames));
        for (ModuleDefine module : moduleServiceLoader) {
            if (moduleSet.contains(module.name())) {
                // 2. 调用provider模块的预处理方法,在模块抽象类中,具体的功能,初始化模块的provider.prepare
                module.prepare(this, applicationConfiguration.getModuleConfiguration(module.name()), moduleProviderLoader);
                loadedModules.put(module.name(), module);
                moduleSet.remove(module.name());
            }
        }
        // Finish prepare stage
        isInPrepareStage = false;
    
        if (moduleSet.size() > 0) {
            throw new ModuleNotFoundException(moduleSet.toString() + " missing.");
        }
    
        BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
        // 3. 调用各个provide模块的的start方法
        bootstrapFlow.start(this);
        // 4. 调用各个provide模块的的notify方法
        bootstrapFlow.notifyAfterCompleted();
    }
    

    2.1 读取配置中的模块名称:读取从1那里的配置模块
    2.2 调用provider模块的预处理方法: ModuleDefine -> ModuleProvider,可插拔式的模块启动可选择性大,集群启动Provider就有etcd, nacos等,配置读取模块就有apollo等可供选择,apollo的配置读取会在prepare时就读取了


    Provider实现类.png

    2.3 调用各个provide模块的的start方法:比如存储Provider使用es时StorageModuleElasticsearch7Provider,prepare方法会用一些注册信息,并new HighClient客户端,在start时才连接。KafkaFetcherProvider也跟es7存储类似
    2.4 调用各个provide模块的的notify方法,起到后置处理作用

    服务注册发现模块

    • 可配置使用etcd, nacos等服务注册发现,如果使用nacos的话对应ClusterModuleNacosProvider这个类,prepare方法处理连接之类的,start和notify方法为空

    KafkaFetcherProvider模块

    • 可以改变发送数据的方式,比如将客户端默认的grpc方式改成kafka发送就需要用这个模块,prepare方法会用一些注册信息,比如new consumer,在start时才启动消费

    存储模块

    • 可根据配置选择存储介质,如果用es7作为数据存储介质的话,存在模块会调用StorageModuleElasticsearch7Provider这个Provider模块。prepare方法会用一些注册信息,并new HighClient客户端,在start时才连接

    配置模块

    • 可配置使用apollo, nacos, 自研等配置服务

    DataTTLKeeperTimer

    • 数据清理,会使用到服务注册发现, oap服务注册发现还有用来广播的功能,比如查询服务列表然后广播到其他服务器
    • Metrics、Trace 等(时间相关的数据)对应的 ES 索引都是按照时间进行切分的,随着时间的推移,ES 索引会越来越多。为了解决这个问题,SkyWalking 只会在 ES 中存储一段时间内的数据,CoreModuleProvider 会启动 DataTTLKeeperTimer 定时清理过期数据

    OAP数据接收

    • 启动时CoreModuleProvider核心provider会初始化GRPCServer, 只留下GRPCServer相关的核心代码
    # CoreModuleProvider
    public void prepare() {
        if (moduleConfig.isGRPCSslEnabled()) {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
                                        moduleConfig.getGRPCSslCertChainPath(),
                                        moduleConfig.getGRPCSslKeyPath()
            );
        } else {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
        }
        if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
            grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
        }
        if (moduleConfig.getMaxMessageSize() > 0) {
            grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
        }
        if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
            grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
        }
        if (moduleConfig.getGRPCThreadPoolSize() > 0) {
            grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
        }
        grpcServer.initialize();
    
    }
    
    • GRPCServer 用于接收 SkyWalking Agent 发送的 gRPC 请求,SkyWalking Agent 会切分OAP服务列表配置项,得到 OAP 服务列表,然后从其中随机选择一个 OAP 服务创建长连接,实现后续的数据上报
    • GRPCServer 处理 gRPC 请求的逻辑,封装在了 ServerHandler 实现之中的。我们可以通过两者的 addHandler() 方法,为指定请求添加相应的 ServerHandler 实现, 比如常见的TraceSegmentReportServiceHandler上报trace数据,就是利用对应的handler处理的


      handler处理时限类.png

    默认GRPC接收

    TraceSegmentReportServiceHandler接收的agent的方法,只留核心的代码

    public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
            return new StreamObserver<SegmentObject>() {
                @Override
                public void onNext(SegmentObject segment) {
                    segmentParserService.send(segment);
                }
        }
    }
    
    • SegmentParserServiceImpl中send方法
    public void send(SegmentObject segment) {
            final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
            traceAnalyzer.doAnalysis(segment);
        }
    
    • TraceAnalyzer中doAnalysis方法: 各个listen是构建存储model,notifyListenerToBuild才触发存储,比如es
    public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }
        // 创建span的listener
        createSpanListeners();
        // 通知segment监听,构建存储model
        notifySegmentListener(segmentObject);
        // 通知 first exit local的各个listen,构建存储model
        segmentObject.getSpansList().forEach(spanObject -> {
            if (spanObject.getSpanId() == 0) {
                notifyFirstListener(spanObject, segmentObject);
            }
    
            if (SpanType.Exit.equals(spanObject.getSpanType())) {
                notifyExitListener(spanObject, segmentObject);
            } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                notifyEntryListener(spanObject, segmentObject);
            } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                notifyLocalListener(spanObject, segmentObject);
            } else {
                log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
            }
        });
        // 触发存储动作
        notifyListenerToBuild();
    }
    
    private void notifyListenerToBuild() {
        analysisListeners.forEach(AnalysisListener::build);
    }
    
    • SegmentAnalysisListener.build方法存储
    // 去掉非核心代码
    public void build() {
        segment.setEndpointId(endpointId);
        segment.setEndpointName(endpointName);
        // 存储核心
        sourceReceiver.receive(segment);
    }
    
    • SourceReceiverImpl.receive方法
    public void receive(ISource source) {
         dispatcherManager.forward(source);
    }
    
    • DispatcherManager.forward方法,只留下核心代码
    public void forward(ISource source) {
        source.prepare();
        for (SourceDispatcher dispatcher : dispatchers) {
            dispatcher.dispatch(source);
        }   
    }
    
    • SegmentDispatcher.dispatch方法,存储数据,比如es
    public void dispatch(Segment source) {
        SegmentRecord segment = new SegmentRecord();
        segment.setSegmentId(source.getSegmentId());
        segment.setTraceId(source.getTraceId());
        segment.setServiceId(source.getServiceId());
        segment.setServiceInstanceId(source.getServiceInstanceId());
        segment.setEndpointName(source.getEndpointName());
        segment.setEndpointId(source.getEndpointId());
        segment.setStartTime(source.getStartTime());
        segment.setEndTime(source.getEndTime());
        segment.setLatency(source.getLatency());
        segment.setIsError(source.getIsError());
        segment.setDataBinary(source.getDataBinary());
        segment.setTimeBucket(source.getTimeBucket());
        segment.setVersion(source.getVersion());
        segment.setTagsRawData(source.getTags());
        segment.setTags(Tag.Util.toStringList(source.getTags()));
    
        RecordStreamProcessor.getInstance().in(segment);
    }
    
    • 最终会调RecordPersistentWorker.in存储
    public void in(Record record) {
        InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
        batchDAO.asynchronous(insertRequest);    
    }
    
    • 最后到es存储BatchProcessEsDAO, 调es的bulk
    public void asynchronous(InsertRequest insertRequest) {
        if (bulkProcessor == null) {
            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
        }
    
        this.bulkProcessor.add((IndexRequest) insertRequest);
    }
    
    agent调用ExitSpan, EntrySpan, LocalSpan
    • trace id的生成类似雪花算法
    • TraceSegmen: TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span, TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace


      TraceSegmen.png
    • EntrySpan:当请求进入服务时会创建 EntrySpan 类型的 Span,它也是 TraceSegment 中的第一个 Span。例如,HTTP 服务、RPC 服务、MQ-Consumer 等入口服务的插件在接收到请求时都会创建相应的 EntrySpan
    • LocalSpan:它是在本地方法调用时可能创建的 Span 类型,在后面介绍 @Trace 注解的时候我们还会看到 LocalSpan
    • ExitSpan:当请求离开当前服务、进入其他服务时会创建 ExitSpan 类型的 Span。例如, Http Client 、RPC Client 发起远程调用或是 MQ-producer 生产消息时,都会产生该类型的 Span
    例子
    • 浏览器发起/test请求时,agent会向oap发起两个trace数据,421c2c8fe62d4e42a197a4914ec6d8d4.58.16449874961510000中421c2c8fe62d4e42a197a4914ec6d8d4表示uuid,同一个请求不同线程uuid是一样的,58表示线程,16449874961510000自增id
    @GetMapping("/test")
    public String test() throws InterruptedException {
        // 访问百度
        doGetTestOne();
        // 访问简书
        doGetTestTwo();
        // 访问csdn
        new Thread(() -> doGetTestThree()).start();
        Thread.sleep(30000);
        return "test";
    }
    
    • 可以看到访问csdn的线程请求只有ExitSpan,EntrySpan在主线程发送的trace数据请求中


      另起线程访问csdn.png
    • 两个ExitSpan是访问百度和简书的,一个EntrySpan是tomcat的


      主线程访问百度和简书.png
    • TraceSegmentServiceClient.consume会消费上述提到的List<TraceSegment> 转换成SegmentObject发给oap服务端。TraceSegmentServiceClient初始化是在agent初始化的BootService中初始化的,会消费DataCarrier数据

    学习

    各个框架服务注册发现处理

    kafka
    • 配置nginx域名(也可直接dns映射),客户端负载请求服务端, fetch服务端信息,更久topic-partation,客户端选择连接某台服务器,所有服务器都有全部服务器的元数据信息
    es
    • 配置nginx,随便连接到哪里,查询时根据文档id路由到某台,所有服务器都有全部服务器的元数据信息,根据信息决定转发路由到哪台
    apollo
    • 配置nginx域名(也可直接dns映射),负载去meta获取服务元数据信息,meta是通过eurka实现服务注册发现
    skywalking
    • 如果是kafka实现则通kafka
    • grpc实现,则可以配置nginx,负载到kong网关,网关层用k8s的服务注册发现

    分布式链路追踪选型

    Zipkin

    • 由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现
    • zipkin的链路监控粒度相对没有那么细,调用链中具体到接口级别,再进一步的调用信息并未涉及

    Pinpoint

    • 一款对Java编写的大规模分布式系统的APM工具,由韩国人开源的分布式跟踪组件
    • pinpoint应该是这三种APM组件中,数据分析最为完备的组件

    Skywalking

    • 国产的优秀APM组件,是一个对JAVA分布式应用程序集群的业务运行情况进行追踪、告警和分析的系统
    • skywalking 还支持20+的中间件、框架、类库,比如:主流的dubbo、Okhttp,还有DB和消息中间件, skywalking链路调用分析比zipkin完备些

    对比

    • Zipkin 使用修改过的类库和它自己的容器(Finagle)来提供分布式事务跟踪的功能。但是,它要求在需要时修改代码。skywalking和pinpoint都是基于字节码增强的方式
    • Pinpoint 的后端存储基于 HBase,而 Zipkin 基于 Cassandra
    • Pinpoint的不足:
    1. 不支持异步执行的调用链追踪(比如多线程、MQ),而SW通过注解可以支持。
    2. 功能比较少,例如缺少平均响应、平均吞吐量等数据,缺少慢服务的统计。
    3. 调用链信息,可以扩展和丰富的程度,要低于SW

    参考文章

    相关文章

      网友评论

          本文标题:分布式链路追踪skywalking(二)-oap服务篇

          本文链接:https://www.haomeiwen.com/subject/hvrkmltx.html