美文网首页
13、Skywalking的埋点数据发送-基于kafka的默认发

13、Skywalking的埋点数据发送-基于kafka的默认发

作者: rock_fish | 来源:发表于2021-06-23 14:02 被阅读0次
TraceSegment 基于Kafka的发送

基于kafka的数据推送和消费中,有这么一个疑问萦绕:OAPServer的数据聚合是怎么实现的?

  • 如果应用A的实例AA的数据都提交在同一个OAPServer实例中,那么在这个实例中就可以进行数据聚合
  • 如果应用A的实例AA的数据提交在多个不同一个OAPServer实例中,那么每个实例中聚合之后,还要再把每个实例聚合后的数据再进行一次聚合。

其大致的处理流程是这样的:采集->发送kafka->OAP消费kafka->聚合/存储

这里先看采集和发送的处理,OAP的处理在OAP端流程梳理的章节看

消息发送的分区策略

KafkaTraceSegmentServiceClient中梳理一下2个知识点:
1.看发送的消息是否有key 和分区信息
2.看分区算法

1. 看消息构造

KafkaTraceSegmentServiceClient#consume中可以看到构造的ProducerRecord中 指定了topic和key

public void consume(final List<TraceSegment> data) {
        data.forEach(traceSegment -> {
            SegmentObject upstreamSegment = traceSegment.transform();
            ProducerRecord<String, Bytes> record = new ProducerRecord<>(
                topic,//指定了topic
                upstreamSegment.getTraceSegmentId(),//指定了key
                Bytes.wrap(upstreamSegment.toByteArray())
            );
           ...省略部分代码
        });
    }
2. 看分区算法

默认情况下未指定Partitioner,则使用Kafka默认的DefaultPartitioner的分区算法
从源码可知,指定了key,则会按照key做hash算出分区,key不变的情况下,所算出的分区是一致的

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
3.推理

根据1,2可知,在使用默认的分区策略的情况下,相同的TraceSegmentId 消息提交到同一个kafka分区中。

4.TraceSegmentId 的生命周期

TraceSegment的构造中给traceSegmentId进行了赋值。

public TraceSegment() {
        this.traceSegmentId = GlobalIdGenerator.generate();
        this.spans = new LinkedList<>();
        this.relatedGlobalTraceId = new NewDistributedTraceId();
        this.createTime = System.currentTimeMillis();
    }

一个TraceSegment有唯一的一个TraceSegmentId,接下来就需要梳理清楚是不是一次调用在一个进程内就有一个TraceSegment呢?先给出答案进程创建第一个EntrySpan的时候会先创建一个TraceSegment,其内部承载了一个span集合。

梳理源码的入口是这样的:对于skywalking来说,进程的入口点是创建一个EntrySpan,所以我们从ContextManager#createEntrySpan 源码中跟踪下去就能寻找到答案:

    public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
        AbstractSpan span;
        AbstractTracerContext context;
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
       ...
       context = getOrCreate(operationName, false);//这里有创建Context的机会
       span = context.createEntrySpan(operationName);
        ...
        return span;
    }

context = getOrCreate(operationName, false)这个方法中,会有创建TraceContext的逻辑:

 context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);

继续跟入代码,可以看到new TracingContext的操作

context = new TracingContext(operationName, spanLimitWatcher);

TracingContext 中有个成员 private TraceSegment segment,在TracingContext构造方法中可以看到TraceSegment的创建过程

TracingContext(String firstOPName, SpanLimitWatcher spanLimitWatcher) {
        this.segment = new TraceSegment();
        ...
}

相关文章

  • 13、Skywalking的埋点数据发送-基于kafka的默认发

    TraceSegment 基于Kafka的发送 基于kafka的数据推送和消费中,有这么一个疑问萦绕:OAPSer...

  • kafka基础

    kafka架构: 1)大数据领域 2)数据集成 3)流计算集成 kafka消息生产者发送消息是批量发送,默认是16...

  • Gson转换错误导致Int变为Double类型

    问题描述 埋点系统负责接收客户端、H5等系统发送过来的用户行为埋点数据,经过统一的接收、解析,最终发到Kafka中...

  • 埋点

    如何进⾏埋点 埋点原理 对基于⽤户⾏为的数据平台来说,发⽣在⽤户界⾯的,能获取⽤户信息的触点就是⽤户数据的直接来源...

  • 12、Skywalking的埋点-TraceSegment数据的

    采样创建TraceSegment 默认情况下,每个请求都应该生成一条完整的 Trace。但在面对海量请求时这就会给...

  • 埋点

    埋点分为客户端埋点和服务端埋点。客户端埋点通过restful api请求json数据写入kafka中,可以单条请求...

  • Kafka常用命令

    本文基于Kafka 0.10.0 基本操作 列出所有topic 创建topic 生产数据 向earth发送一条消息...

  • kafka 同步、异步发送

    kafka producer默认是异步发送: 在初始化producer实例时,会创建一个sender线程负责批量发...

  • Android埋点技术总结

    1.埋点技术的分类 1.1 代码埋点:代码埋点是指在某个事件发生时调用数据发送接口上报数据。例如开发人员按照产品/...

  • Spark Streaming读写Kafka,将offset写入

    摘要: Spark Streaming, Kafka,zookeeper Scala调用Kafka API发送数据...

网友评论

      本文标题:13、Skywalking的埋点数据发送-基于kafka的默认发

      本文链接:https://www.haomeiwen.com/subject/jmpdrltx.html