美文网首页
Skywalking源码研究之上报和采集

Skywalking源码研究之上报和采集

作者: pq217 | 来源:发表于2024-08-10 15:56 被阅读0次

    关于Agent端的上报和OAP的采集,Skywalking技术比较成熟,重点关注插件在拦截器中如何进行上报

    ContextManager

    插件进行数据上报(上传span),主要是通过ContextManager提供的API完成的,重点方法如下

    beforeMethod中常用的方法:

    • ContextManager#createEntrySpan 创建入口span
    • ContextManager#createLocalSpan 创建本地span
    • ContextManager#createExitSpan 创建出口span
    • Tags#URL#set 设置span的url属性
    • span#setComponent 设置span的component属性
    • SpanLayer#asRPCFramework 标记span为RPC
    • SpanLayer#asHttp 标记span为http
    • 等等

    注意:beforeMethod调用的各种span的create方法并没有实际的进行数据上报,只是暂存在ThreadLocal中

    实际触发上报一般是在afterMethod中,常用的方法

    • ContextManager#stopSpan

    比较直观,即在方法结束时标记span结束,重点是该方法内部会调用TracingContext#finish方法,这个方法会实际触发数据上报

    以Dubbo插件为例,看一下插件对上报API的使用

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        // dubbo的rpcContext,使用它可以通过网络传递附件,这里就是传递上下文载体
        RpcContext rpcContext = RpcContext.getContext();
        
        if (isConsumer) {// 消费方
            final ContextCarrier contextCarrier = new ContextCarrier();
            // 调用createExitSpan创建出口span
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
            // 使用附件传输上下文载体
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
            }
        } else { // 服务方
            ContextCarrier contextCarrier = new ContextCarrier();
            // 跟据附件获取上下文载体
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }
            // 调用createEntrySpan创建入口span
            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
            span.setPeer(rpcContext.getRemoteAddressString()); // span设置peer属性
        }
    
        Tags.URL.set(span, generateRequestURL(requestURL, invocation)); // 设置url属性
        collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); // 收集参数
        span.setComponent(ComponentsDefine.DUBBO); // 设置component属性
        SpanLayer.asRPCFramework(span); // 设置layer属性
    }
    
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        
        // 标记span结束
        ContextManager.stopSpan();
        return ret;
    }
    

    数据上报

    finish

    上文提到ContextManager#stopSpan方法会触发数据上报,因为内部会调用TracingContext#finish方法:

    private void finish() {
        // 正在运行的span是否为空,即segment下的span全部完成
        boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
        if (isFinishedInMainThread) { // 如果已完成
            // 发布segment完成通知
            TracingThreadListenerManager.notifyFinish(this); 
        }
    }
    

    可以看到span在stop之后并不一定会上报,而是整个segment下的span全部完成才触发上报,并且使用发布监听模式通知消费者处理,其中消费者就是TraceSegmentServiceClient

    TraceSegmentServiceClient

    TraceSegmentServiceClient接收到segment结束信号后,同样不是直接建立网络连接上报,而是将消息发给了skywalking内部的消息队列

    @Override
    public void afterFinished(TraceSegment traceSegment) {
        if (!carrier.produce(traceSegment)) { // 发布到carrier消息队列
            ...
        }
    }
    

    Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent写入内存消息队列,后台线程【异步】发送给 Collector

    这个消息队列就是skywalking自实现的:DataCarrier

    同时TraceSegmentServiceClient本身即是消息的发布者,又是消费者

    // TraceSegmentServiceClient的初始化方法
    public void boot() {
        ...
        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 创建消息队列
        carrier.consume(this, 1); // 消费者就是this, 即TraceSegmentServiceClient本身
    }
    

    而消费的方法就是TraceSegmentServiceClient#consume方法

    @Override
    public void consume(List<TraceSegment> data) { // 参数就是Segment,因为使用队列,可能是多个
        // 使用Grpc包工具
        final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
        StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
            Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
        ).collect(new StreamObserver<Commands>() {
            ...
        });
        
        
        try {
            // 循环流式上报segment
            for (TraceSegment segment : data) {
                SegmentObject upstreamSegment = segment.transform();
                upstreamSegmentStreamObserver.onNext(upstreamSegment);
            }
        } catch (Throwable t) {
            LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
        }
        // 结束
        upstreamSegmentStreamObserver.onCompleted();
    }
    

    这里的upstreamSegmentStreamObserver工具是使用grpc-stub的grpc协议

    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>${grpc.version}</version>
    </dependency>
    

    使用流式数据传输的方式实现数据上报,上传的对象SegmentObject

    OAP采集

    OAP采集器主要在server-receiver-plugin包中,依然是使用插件的方式应对不同数据的采集,比如

    • skywalking-trace-receiver-plugin 分布式链路采集
    • skywalking-jvm-receiver-plugin jvm采集
    • 等等大概10几个采集插件

    这里主要分析分布式链路采集插件:skywalking-trace-receiver-plugin

    TraceSegmentReportServiceHandler

    分布式链路trace的采集主要入口TraceSegmentReportServiceHandler#collect方法,接受流式数据对象SegmentObject(对应了Agent端的上传对象)

    @Override
    public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
        // 使用grpc.stub的服务端
        return new StreamObserver<SegmentObject>() {
            // 收到数据的处理
            @Override
            public void onNext(SegmentObject segment) {
                try {
                    // 调用segmentParserService.send
                    segmentParserService.send(segment);
                } catch (Exception e) {
                    ...
                }
            }
            ...
        };
    }
    

    具体的Segment信息处理交由segmentParserService#send

    SegmentParse

    Segment信息的解析器,send方法如下

    public void send(SegmentObject segment) {
        final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
        traceAnalyzer.doAnalysis(segment);
    }
    

    转交给TraceAnalyzer分析并处理

    TraceAnalyzer

    其中doAnalysis负责分析Segment对象,方法如下

    // 解析SegmentO对象,基本都是订阅发布模式
    public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }
    
        createSpanListeners();
    
        notifySegmentListener(segmentObject);
    
        // 循环所有的span
        segmentObject.getSpansList().forEach(spanObject -> {
            if (spanObject.getSpanId() == 0) {
                // 解析第一个 Span
                notifyFirstListener(spanObject, segmentObject);
            }
    
            if (SpanType.Exit.equals(spanObject.getSpanType())) {
                // 解析出口Span
                notifyExitListener(spanObject, segmentObject);
            } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                // 解析入口Span
                notifyEntryListener(spanObject, segmentObject);
            } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                // 解析本地Span
                notifyLocalListener(spanObject, segmentObject);
            } else {
                log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
            }
        });
        
        // 通知 Span 监听器们,执行构建各自的数据
        notifyListenerToBuild();
    }
    

    总结就是通知 Span 监听器们,去构建各自的数据,经过流式处理,最终存储到存储器。

    相关文章

      网友评论

          本文标题:Skywalking源码研究之上报和采集

          本文链接:https://www.haomeiwen.com/subject/uztkkjtx.html