美文网首页Javaweb
Springboot注解@KafkaListener实现Kafk

Springboot注解@KafkaListener实现Kafk

作者: Liuzz25 | 来源:发表于2018-12-05 11:39 被阅读6次

    在使用时Kafka时,经常遇到大批量消息在队列中,如果一个消息一个消息的消费的话效率太低下了,所以批量消费消息是很有必要的,废话不多数,直接上代码。

    批量监听器

    从版本1.1开始,@KafkaListener可以被配置为批量接收从Kafka话题队列中的Message。要配置监听器容器工厂以创建批处理侦听器,需要设置batchListener属性为true,代码如下:

    @Bean
    KafkaListenerContainerFactory<?> batchFactory() {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
            ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
      factory.setBatchListener(true); // 开启批量监听
      return factory;
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //设置每次接收Message的数量
      props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
      props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
    }
    
    批量接收

    在@KafkaListener注解中声明工厂为batchFactory().

    @KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")
    public void listen(List<ConsumerRecord<?, ?>> list) {
      List<String> messages = new ArrayList<>();
      for (ConsumerRecord<?, ?> record : list) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        // 获取消息
        kafkaMessage.ifPresent(o -> messages.add(o.toString()));
      }
      if (messages.size() > 0) {
        // 更新索引
        updateES(messages);
      }
    }
    

    相关文章

      网友评论

        本文标题:Springboot注解@KafkaListener实现Kafk

        本文链接:https://www.haomeiwen.com/subject/kcsicqtx.html