美文网首页springbootkafkaJAVA
spring boot与kafka集成(spring boot

spring boot与kafka集成(spring boot

作者: SamHxm | 来源:发表于2017-03-05 20:33 被阅读10401次

    随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便。

    引入依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    具体spring-kafka的版本由spring boot的当前版本决定。

    application.properties配置文件
    spring.kafka.bootstrap-servers=192.168.1.107:9092
    spring.kafka.consumer.group-id=myGroup
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    

    最简化的配置仅需指定kafka主机和消息者组名即可。这里使用的是单节点kafka,集群环境中配置多个kafka主机地址即可。例如:

    spring.kafka.bootstrap-servers=192.168.1.107:9092,192.168.1.108:9092,192.168.1.109:9092
    

    以下4项配置指定消息key和消息体的编解码方式。

    spring.kafka.consumer.key-deserializer
    spring.kafka.consumer.value-deserializer
    spring.kafka.producer.key-serializer
    spring.kafka.producer.value-serializer
    
    消息对象
    import java.util.Date;
    
    public class Message {
    
        private Long id;
        private String msg;
        private Date sendTime;
        public Long getId() {
            return id;
        }
        public void setId(Long id) {
            this.id = id;
        }
        public String getMsg() {
            return msg;
        }
        public void setMsg(String msg) {
            this.msg = msg;
        }
        public Date getSendTime() {
            return sendTime;
        }
        public void setSendTime(Date sendTime) {
            this.sendTime = sendTime;
        }
    }
    
    消息生产者
    import java.util.Date;
    
    import java.util.UUID;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    
    @Component
    public class Sender {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        private Gson gson = new GsonBuilder().create();
        
        public void sendMessage(){
            Message m = new Message();
            m.setId(System.currentTimeMillis());
            m.setMsg(UUID.randomUUID().toString());
            m.setSendTime(new Date());
            kafkaTemplate.send("test1", gson.toJson(m));
        }
    }
    
    消息消费者
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    
    @Component
    public class Receiver {
        
        private Gson gson = new GsonBuilder().create();
        
        @KafkaListener(topics = "test1")
        public void processMessage(String content) {
            Message m = gson.fromJson(content, Message.class);
        }
    }
    
    运行
    @SpringBootApplication
    public class AppStart  {
        public static void main(String[] args) throws InterruptedException {
    
            ApplicationContext app = SpringApplication.run(AppStart.class, args);
            
            while(true){
                Sender sender = app.getBean(Sender.class);
                sender.sendMessage();
                Thread.sleep(500);
            }
        }
        
    }
    
    通过上面的示例可以发现,相对于spring boot 1.4.x版本,1.5集成kafka主要是将以前需要手工编码进行设置的kafka配置改由spring配置文件定义。
    注意

    我使用的spring boot版本是1.5.1,spring-kafka版本1.1.2,jdk1.8,该组合似乎不支持低版本的kafka。之前我使用kafka版本为2.11-0.10.0.0,向kafka发送消息时一直产生异常,后来升级kafka版本至2.11-0.10.2.0故障消失。由于测试时间有限,未作进一步分析。希望查明原因的同学能私信我。谢谢。

    相关文章

      网友评论

      • 文静不是安静:这样消费的时候只能把主题写死在程序里,这样不太好吧
        种豆那山下_:@KafkaListener(topics = "${kafka.topic.payNotify}")
        public void receive(String payload) {
        log.info("received payload='{}'",payload);
        }
      • o0风依旧0o:能把您的完整代码发出来吗
      • beykery:不对,消费者有问题,抛出一堆异常。
        SamHxm:@beykery 还是检查下你的代码吧!
      • Breeze_270a:生产者用命令可以取到 但是代码里的消费者没有反应是怎么回事呢?
        Leo_shell:@Breeze_270a 我也是这个问题,请问有解决办法了吗?
        Breeze_270a:@SamHxm springboot 1.5.2 spring-kafka 1.2.0 和相应的kafka服务端 就是按你的文章配置的 我也看官方文档了 除了这些还有什么配置吗?方便加个QQ 80491491 请指教下 谢谢
        SamHxm:@Breeze_270a 是否是版本问题?

      本文标题:spring boot与kafka集成(spring boot

      本文链接:https://www.haomeiwen.com/subject/bzvegttx.html