美文网首页
SpringBoot整合Kafka

SpringBoot整合Kafka

作者: 你说我听杂谈 | 来源:发表于2020-07-22 17:01 被阅读0次

    一、Maven准备
    二、配置文件
    三、发送者
    四、接收者
    五、消息类
    六、在虚拟机中,查看接受者是否接受成功

    一、Maven准备

           <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>1.0.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
    

    二、配置文件

    
    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=192.168.71.129:9092
    
    #=============== provider  =======================
    
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=test-consumer-group
    
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    三、发送者

            @Resource
           private KafkaTemplate<String, String> kafkaTemplate;
    
            Message message = new Message();
            message.setId( System.currentTimeMillis() );
            message.setMsg( respStr );
            message.setSendTime( new Date() );
            kafkaTemplate.send( "producersToConsumers_logs", gson.toJson( message ) );
    

    四、接受者

        @KafkaListener(topics = {"producersToConsumers_logs"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> optional = Optional.ofNullable( record.value() );
    
            if (optional.isPresent()) {
                Object msg = optional.get();
                log.info( "record:{}", record );
                log.info( "message:{}", msg );
            }
        }
    

    五、消息类

    @Data
    public class Message {
    
        private long id;
    
        private String msg;
    
        private Date SendTime;
    
    
    }
    

    六、在虚拟机中,查看接受者是否接受成功

    命令:

    ######启动zookeeper
    bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties
    #####启动kafka服务
    bin/kafka-server-start.sh config/server.properties
    #####查看集合
    bin/kafka-topics.sh --list --zookeeper  localhost:2181
    #####消费者查看信息
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic producersToConsumers_logs --from-beginning
    

    如图


    消费者接受信息

    注:配置文件只是做了简单的测试配置,在实际业务生成中,要根据业务要求,再详细配置
    参考官网API:https://www.springcloud.cc/apache-kafka-zhcn.html#api

    相关文章

      网友评论

          本文标题:SpringBoot整合Kafka

          本文链接:https://www.haomeiwen.com/subject/ebuzkktx.html