本文整合基于Springboot2.0+,kafka版本
kafka_2.12-2.3.0
,使用org.springframework.kafka
来做的整合
项目目录结构
项目目录结构pom.xml依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
application.yml配置文件
server:
port: 8081
spring:
kafka:
bootstrap-servers: http://ip1:9092,http://ip2:9092,http://ip3:9092
producer:
retries: 3
acks: all
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: consumer-group1
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 20000
listener:
concurrency: 3
ack-mode: MANUAL
本配置文件是才用的并发批量消费方式, bootstrap-servers是我们集群的机器地址
生产者controller
@RestController
@Slf4j
public class ProducerController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/send/{messge}")
public String sendmsg(@PathVariable String messge) {
//建议看一下KafkaTemplate的源码 很多api 我们可以指定分区发送消息
kafkaTemplate.send("test", messge); //使用kafka模板发送信息
String res = "消息:【" + messge + "】发送成功 SUCCESS !";
log.info(res);
return res;
}
}
消费者监听器
@Component
@Slf4j
public class ConsumerListener {
//建议看一下KafkaListener的源码 很多api 我们也可以指定分区消费消息
// topicPartitions ={@TopicPartition(topic = "topic1", partitions = { "0", "1" })}
@KafkaListener(topics = "test", groupId = "consumer-group")
public void listen(List<String> list, Acknowledgment ack) {
log.info("本次批量拉取数量:" + list.size() + " 开始消费....");
List<String> msgList = new ArrayList<>();
for (String record : list) {
Optional<?> kafkaMessage = Optional.ofNullable(record);
// 获取消息
kafkaMessage.ifPresent(o -> msgList.add(o.toString()));
}
if (msgList.size() > 0) {
for (String msg : msgList) {
log.info("开始消费消息【" + msg + "】");
}
// 更新索引
// updateES(messages);
}
//手动提交offset
ack.acknowledge();
msgList.clear();
log.info("消费结束");
}
}
我们的消费者监听器才用的并发批量下拉数据 才用手动提交方式避免消息丢失
启动类
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
启动程序并且生产消息
图片.png通启动日志我们可以看到我们成功连接到kafka集群
kafka生产和消费日志信息
图片.png
这里我们也可以通过批量生产消息 改变配置文件的并发参数和批量下拉参数来做批量并发消费
我们这里topic设置的为test groupId为consumer-group
网友评论