美文网首页
spring boot集成kafka

spring boot集成kafka

作者: Yechom | 来源:发表于2020-05-26 23:34 被阅读0次

    pom.xml

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
    
      <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
      </dependency>
    

    application.properties

    # kafka 服务地址
    spring.kafka.bootstrap-servers=10.7.221.25:9092,10.7.221.26:9092,10.7.221.31:9092
    # 生产消息key和value的编码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 消费者的group-id
    spring.kafka.consumer.group-id=test
    # 消费者的偏移量将在后台定期提交,默认值为true
    spring.kafka.consumer.enable-auto-commit=true
    # 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
    spring.kafka.consumer.auto-commit-interval=1000
    # 消费消息key和value的解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    生产者

    package com.yechom.springbootkafkademo.kafka;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequestMapping("kafkaProducer")
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        @RequestMapping("product/{message}")
        public String send(@PathVariable String message){
            kafkaTemplate.send("test",message);
            return "kafka消息发出成功:" + message;
         }
    }
    

    消费者

    package com.yechom.springbootkafkademo.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics ="test")
        public void listener(ConsumerRecord record){
            System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
        }
    }
    

    测试结果

    1590506946(1).jpg 1590507035(1).jpg

    注意

    1. 不同版本的kafka包可能会报 org.springframework.kafka.annotation.KafkaListener 找不到,pom文件不加版本会自动拉最新的版本

    2. spring.kafka.consumer.enable-auto-commit设置消费者的偏移量后台定期提交,在spring boot环境下,设置为false时,spring boot也会自动后台提交偏移量

    相关文章

      网友评论

          本文标题:spring boot集成kafka

          本文链接:https://www.haomeiwen.com/subject/qtybahtx.html