spring boot与kafka集成

作者: SamHxm | 来源:发表于2016-12-02 23:40 被阅读9984次

    引入相关依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.1.1.RELEASE</version>
    </dependency>
    

    从依赖项的引入即可看出,当前spring boot(1.4.2)还不支持完全以配置项的配置来实现与kafka的无缝集成。也就意味着必须通过java config的方式进行手工配置。

    定义kafka基础配置

    与redisTemplate及jdbcTemplate等类似。spring同样提供了org.springframework.kafka.core.KafkaTemplate作为kafka相关api操作的入口。

    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
        
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }
    

    KafkaTemplate依赖于ProducerFactory,而创建ProducerFactory时则通过一个Map指定kafka相关配置参数。通过KafkaTemplate对象即可实现消息发送。

    kafkaTemplate.send("test-topic", "hello");
    or
    kafkaTemplate.send("test-topic", "key-1", "hello");
    

    监听消息配置

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @EnableKafka
    public class KafkaConsumerConfig {
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            return factory;
        }
    
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> propsMap = new HashMap<>();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            return propsMap;
        }
    
        @Bean
        public Listener listener() {
            return new Listener();
        }
    }
    

    实现消息监听的最终目标是得到监听器对象。该监听器对象自行实现。

    import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.springframework.kafka.annotation.KafkaListener;
        
        import java.util.Optional;
        
        public class Listener {
    
        @KafkaListener(topics = {"test-topic"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                System.out.println("listen1 " + message);
            }
        }
    }
    

    只需用@KafkaListener指定哪个方法处理消息即可。同时指定该方法用于监听kafka中哪些topic。

    注意事项

    定义监听消息配置时,GROUP_ID_CONFIG配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

    @KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

    KEY_DESERIALIZER_CLASS_CONFIG与VALUE_DESERIALIZER_CLASS_CONFIG指定key和value的编码、解码策略。kafka用key值确定value存放在哪个分区中。

    后记

    时间是解决问题的有效手段之一。

    在spring boot 1.5版本中即可实现spring boot与kafka Auto-configuration

    相关文章

      网友评论

      • 470931a5ddd0:我想开多个消费者,有详细说明没有呢
      • 126476fa4ec8:消费的时候不能动态指定topic么
        晓龙_669c:@SamHxm 如何实现呢?springboot 与kafka 结合动态指定topic
        SamHxm:@Donald_d248 用注解方式不行,可以用原生api实现
      • 653b375fc989:您好,你码云上的那个demo,消费者为什么接收不到消息呢?
      • 8615af07545e:楼主请问,kafka做什么的,不用集群吗,集群不使用zookeeper吗,最近也在学习springboot
        SamHxm:kafaka是一个分布式的消息队列中间件,它依赖zookeeper,平时在开发中,我们往往使用单机或伪集群模式。根据业务场景,也可以独立使用zookeeper,完成分布式控制相关的功能实现。它们和springboot没有必然联系,根据需要灵活选择。
      • 紫玥迩:这个里面配置了propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);那怎么手动提交offset呀
        紫玥迩:哦 我知道了,需要实现AcknowledgingMessageListener
        紫玥迩:@SamHxm public void processMessage(String content, Acknowledgment ack)这样的监听方法会报错吧,,Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment]
        SamHxm:@紫玥迩 使用spring boot 1.5以上版本,在application.properties中配置spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE,在接收消息的代码中使用Acknowledgment对象的acknowledge()方法手动提交offset。例如: @KafkaListener(topics = "test2")
        public void processMessage(String content, Acknowledgment ack) {
        logger.info("message={}", content);
        ack.acknowledge();
        }
      • d5c633ba3600:问什么我的消费者没有任何反应呢
        d5c633ba3600:@SamHxm 还是一样,demo也收不到消息,kafka-console-consumer.sh测试是可以消费消息的
        SamHxm:@anf0506 可以看看我的demo.https://git.oschina.net/samhxm/spring-boot-integration.git 用devlop分支
        SamHxm:@anf0506 1.生产者是否发送成功。2.消费者是否正常监听。 仔细检查吧,另外注意版本问题。
      • sorry_zt_JavaTu:kafkaTemplate.send("test-topic", "hello");
        or
        kafkaTemplate.send("test-topic", "key-1", "hello");我不明白作者您这是在那个文件里写的,我刚刚学习springboot,真的不清楚这个
        sorry_zt_JavaTu:@SamHxm 嗯嗯
        SamHxm:这是模拟消息发送的代码,写在消息发送的类里,往往消息发送和消息接收不在同一个项目中,所以这块代码通常写在需要发送消息的项目中。
      • 418ae0ea9305:请问楼主,使用@KafkaListener(topics = {"test-topic"}) 监听器消费消息,是否可以控制一次接收消息的最小数量 这种更细粒度的控制
        418ae0ea9305:@SamHxm 谢谢了,看来我只能通过配置xml 来实现了.
        SamHxm: @418ae0ea9305 据我所知,无法实现
      • yiqiquhuxi:Error reading field 'brokers': Error reading field 'host': Error reading string of length 12601, only 97 bytes available
        SamHxm:@General008 如果确认配置没错的话,检查网络及主机环境了。
        yiqiquhuxi:@SamHxm 我这个是启动时候报错。
        SamHxm:@General008 检查网络情况,应传输12601字节,但应用只收到97字节。
      • yiqiquhuxi:有问题
        江南小帅:@General008 jdk1.8
        yiqiquhuxi:@SamHxm Optional.ofNullable()方法找不到
        SamHxm: @General008 请详细说明

      本文标题:spring boot与kafka集成

      本文链接:https://www.haomeiwen.com/subject/cdnqmttx.html