美文网首页
Spring集成kafka态订阅消费消息

Spring集成kafka态订阅消费消息

作者: 和平菌 | 来源:发表于2019-08-19 17:27 被阅读0次

    1、消息处理的类
    定义一个类,继承MessageListener接口来处理消息

    public class KakaMessageListener implements MessageListener<String, String> {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            String topic = record.topic();
            String content = record.value();
        }
    }
    

    2、配置Kafka的Consumer

    @Data
    @Component
    @ConfigurationProperties(prefix = "kafkaconfig")
    @EnableKafka
    public class KafkaConfig {
    
        private String servers;
        private String groupid;
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(6);
            factory.getContainerProperties().setPollTimeout(1500);
    
            return factory;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
    
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
            return propsMap;
        }
    }
    

    3、动态的进行订阅和取消订阅

    @Autowired
        KakaMessageListener kakaMessageListener;
    
        @Autowired
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> factory;
    
        public static ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>> cache = new ConcurrentHashMap<>();
    
        @Override
        public void subscribe(String topic) {
            ConcurrentMessageListenerContainer<String, String> container = null;
            if(cache.containsKey(topic)){
                container = cache.get(topic);
            }
    
            if(container == null){
                container = factory.createContainer(topic);
                cache.put(topic, container);
            }
    
            container.setupMessageListener(kakaMessageListener);
            container.start();
    
            log.info("订阅kafka消息:" + topic);
        }
    
        @Override
        public void unSubscribe(String topic) {
            if(cache.containsKey(topic)){
                ConcurrentMessageListenerContainer<String, String> container = cache.get(topic);
                if(container != null){
                    container.stop();
                    log.info("取消订阅kafka消息:" + topic);
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:Spring集成kafka态订阅消费消息

          本文链接:https://www.haomeiwen.com/subject/oosesctx.html