pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
# kafka 服务地址
spring.kafka.bootstrap-servers=10.7.221.25:9092,10.7.221.26:9092,10.7.221.31:9092
# 生产消息key和value的编码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者的group-id
spring.kafka.consumer.group-id=test
# 消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=true
# 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=1000
# 消费消息key和value的解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
生产者
package com.yechom.springbootkafkademo.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("kafkaProducer")
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("product/{message}")
public String send(@PathVariable String message){
kafkaTemplate.send("test",message);
return "kafka消息发出成功:" + message;
}
}
消费者
package com.yechom.springbootkafkademo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics ="test")
public void listener(ConsumerRecord record){
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
测试结果
1590506946(1).jpg 1590507035(1).jpg注意
-
不同版本的kafka包可能会报 org.springframework.kafka.annotation.KafkaListener 找不到,pom文件不加版本会自动拉最新的版本
-
spring.kafka.consumer.enable-auto-commit设置消费者的偏移量后台定期提交,在spring boot环境下,设置为false时,spring boot也会自动后台提交偏移量
网友评论