美文网首页
Skywalking OAP 源码分析-- kafka-fetc

Skywalking OAP 源码分析-- kafka-fetc

作者: 金刚_30bf | 来源:发表于2021-04-01 17:30 被阅读0次

    版本v8.4.0

    关于Kafka-fetcher-plugin

    kafka-fetcher-plugin 是Skywalking oap的一个可选module,名称为 "kafka-fetcher" ,它用来从kafka读取agent上送信息,一般与agent端的kafka-reporter-plugin配合使用。
    通过在oap的配置文件application.yml 中启用该module,并配置相关kafka参数。

    kafka-fetcher:
      selector: ${SW_KAFKA_FETCHER:default}
      default:
        bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
    

    topic的配置和自动创建topic

    KafkaFetcherConfig类中有topic的默认名字,如果不在配置文件中修改,则默认使用之。

        private String configPath = "meter-analyzer-config";
        private String topicNameOfMetrics = "skywalking-metrics";
        private String topicNameOfProfiling = "skywalking-profilings";
        private String topicNameOfTracingSegments = "skywalking-segments";
        private String topicNameOfManagements = "skywalking-managements";
        private String topicNameOfMeters = "skywalking-meters";
        private String topicNameOfLogs = "skywalking-logs";
        private boolean createTopicIfNotExist = true;
    
        private boolean createTopicIfNotExist = true; 
        private int partitions = 3;
        private int replicationFactor = 2;
    

    另外,默认情况下,oap启动后会去检查相关topic是否已经创建,否则会自动创建。 见代码 KafkaFetcherHandlerRegister.java的构造函数: (当前版本createTopicIfNotExist 这个参数并未使用)

            if (!missedTopics.isEmpty()) {
                log.info("Topics" + missedTopics.toString() + " not exist.");
                List<NewTopic> newTopicList = missedTopics.stream()
                                                          .map(topic -> new NewTopic(
                                                              topic,
                                                              config.getPartitions(),
                                                              (short) config.getReplicationFactor()
                                                          )).collect(Collectors.toList());
    
                try {
                    adminClient.createTopics(newTopicList).all().get();
                } catch (Exception e) {
                    throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", e);
                }
            }
    

    在该构造函数中还有线程池的创建:

            executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
                                              60, TimeUnit.SECONDS,
                                              new ArrayBlockingQueue(threadPoolQueueSize),
                                              new CustomThreadFactory("KafkaConsumer"),
                                              new ThreadPoolExecutor.CallerRunsPolicy()
            );
    

    这里使用的拒绝策略是CallerRunsPolicy, 意思是当线程池和队列满了后,调用者线程会来执行这个任务。

    消费消息

    在kafkaFetcherProvider 被oap加载后,会调用它的start方法,在start方法中会调用handlerRegister.start();

    // 实现了Runnable接口
    public class KafkaFetcherHandlerRegister implements Runnable {} 
    
        public void start() {
            // start方法将自己提交到线程池中执行
            executor.submit(this);
        }
    
    // run方法循环使用consumer拉取消息进行消费 ,拉取间隔为500ms 
        public void run() {
            while (true) {
                    ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
                        while (iterator.hasNext()) {
                            ConsumerRecord<String, Bytes> record = iterator.next();
                            executor.submit(() -> handlerMap.get(record.topic()).handle(record));
                        }
              }
        }
    

    对于拉取到的每条消息, 根据消息的topic从handlerMap中获取对应的handler,让后将handler.handle() 提交给线程池进行处理。

    topic 与 handler

    kafkaFetcherProvider.start() 中可以看到topic和对应handler的关系:

            handlerRegister.register(new JVMMetricsHandler(getManager(), config));
            handlerRegister.register(new ServiceManagementHandler(getManager(), config));
            handlerRegister.register(new TraceSegmentHandler(getManager(), config));
            handlerRegister.register(new ProfileTaskHandler(getManager(), config));
    
            if (config.isEnableMeterSystem()) {
                handlerRegister.register(new MeterServiceHandler(getManager(), config));
            }
            if (config.isEnableLog()) {
                handlerRegister.register(new LogHandler(getManager(), config));
            }
    

    总结:

    oap的kafka-fetcher模块从kafka读取消息,然后根据topic获取响应的handler提交给线程池进行处理。

    相关文章

      网友评论

          本文标题:Skywalking OAP 源码分析-- kafka-fetc

          本文链接:https://www.haomeiwen.com/subject/ssvtkltx.html