美文网首页
SpringBoot整合Kafka

SpringBoot整合Kafka

作者: 你说我听杂谈 | 来源:发表于2020-07-22 17:01 被阅读0次

一、Maven准备
二、配置文件
三、发送者
四、接收者
五、消息类
六、在虚拟机中,查看接受者是否接受成功

一、Maven准备

       <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

二、配置文件


#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.71.129:9092

#=============== provider  =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

三、发送者

        @Resource
       private KafkaTemplate<String, String> kafkaTemplate;

        Message message = new Message();
        message.setId( System.currentTimeMillis() );
        message.setMsg( respStr );
        message.setSendTime( new Date() );
        kafkaTemplate.send( "producersToConsumers_logs", gson.toJson( message ) );

四、接受者

    @KafkaListener(topics = {"producersToConsumers_logs"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> optional = Optional.ofNullable( record.value() );

        if (optional.isPresent()) {
            Object msg = optional.get();
            log.info( "record:{}", record );
            log.info( "message:{}", msg );
        }
    }

五、消息类

@Data
public class Message {

    private long id;

    private String msg;

    private Date SendTime;


}

六、在虚拟机中,查看接受者是否接受成功

命令:

######启动zookeeper
bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties
#####启动kafka服务
bin/kafka-server-start.sh config/server.properties
#####查看集合
bin/kafka-topics.sh --list --zookeeper  localhost:2181
#####消费者查看信息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic producersToConsumers_logs --from-beginning

如图


消费者接受信息

注:配置文件只是做了简单的测试配置,在实际业务生成中,要根据业务要求,再详细配置
参考官网API:https://www.springcloud.cc/apache-kafka-zhcn.html#api

相关文章

网友评论

      本文标题:SpringBoot整合Kafka

      本文链接:https://www.haomeiwen.com/subject/ebuzkktx.html