美文网首页程序员
【方案白嫖】Kafka如何监听动态改变的topic

【方案白嫖】Kafka如何监听动态改变的topic

作者: 橙皇cc | 来源:发表于2020-11-29 22:39 被阅读0次

    问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。

    方案一:

    如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。

    @Configuration
    @EnableKafka
    public class KafkaConfig {
    
        private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
        private static final String LOCAL_GROUP_ID = "cctest";
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
        kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
            // kv都用string来序列化
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
    
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @KafkaListener(topicPattern = "${topicPattern}")
        public void listen(String data) {
            System.out.println("message have been consumed:"+data);
        }
    }
    
    

    方案弊端:

    1. 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
    2. 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。

    方案二:

    用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。

    管理消费者bean
    @Component
    @Data
    public class ResourceNotifyConsumer {
    
        private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);
    
        @Autowired
        // 消费数据的service,异步处理
        private MonitorPoolService monitorPoolService;
    
        private KafkaConsumer<String, String> consumer = null;
    
        public void closeConsumer() {
            // 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
            //  同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
            consumer = null;
        }
    
        public void onMessage() {
            while (consumer != null) {
                // 从kafka中取出100毫秒的数据
                List<Map<String, Object>> datas = new ArrayList<>();
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> recordData = new HashMap<>(8);
                    recordData.put(record.key(), record.value());
                    datas.add(recordData);
                }
                // 处理消息
                if (CollectionUtils.isNotEmpty(datas)) {
                    monitorPoolService.dealResource(datas);
                }
            }
        }
    }
    
    
    定时调用接口
    @GetMapping("/check")
    public ResponseMessage checkConfig() {
        // 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
        List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
        List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
            resourceNotifyConsumer.closeConsumer();
            // 新的监听topic集合
            List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
            // 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
            resourceService.buildConsumer(topics);
            // 更新配置的状态
            topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
            topicConfigDao.saveAll(topicWithoutListening);
        }
        return ResponseMessage.success();
    
    

    方案弊端:

    1. 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
    2. 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。

    方案三:

    1. 独立kafka消费模块为一个单独的jar文件
    2. 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程

    方案弊端:

    1. 能在模块内解决的问题,尽量不变复杂。。

    总结:

    最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。

    相关文章

      网友评论

        本文标题:【方案白嫖】Kafka如何监听动态改变的topic

        本文链接:https://www.haomeiwen.com/subject/mbjlwktx.html