1. 背景
本文简述 kafka 的相关内容。
2.知识
更多基础知识见:https://www.jianshu.com/p/bee2152f476c
如何安装 kafka 见:https://www.jianshu.com/p/8a076052a9ad
3. 示例
3.1 配置一个“生产者”
1、添加依赖
新建一个项目,并添加依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置kafka的服务地址
在配置文件 application.yml 中配置。
server:
port: 8081
spring:
application:
name: "producer"
kafka:
bootstrap-servers: "localhost:9092"
3、创建topic
我使用 java 来创建 topic ,注入一个 NewTopic 对象即可。
@Component
public class KafkaConfig {
private static final String TOPIC_NAME = "topic2";
// 创建一个主题 topic
@Bean
public NewTopic topic1() {
return TopicBuilder.name(TOPIC_NAME)
.partitions(1)
.replicas(1)
.compact()
.build();
}
}
4、发送消息
首先需要注入一个 kafkaTemplate 对象。这个是个 kafka 基础操作的模板方法类。springboot 框架已经帮忙配置好了,直接注入即可。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
然后直接 send 发送消息
private static final String TOPIC_NAME = "topic2";
kafkaTemplate.send(TOPIC_NAME, data);
就是这么简单省事。
3.2 配置一个“消费者 ”
1、添加依赖
新建一个项目,并添加依赖同上。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置kafka服务器地址
修改 application.yml ,示例:
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup1"
client-id: "myGroup1"
3、监听消息
KafkaListener 这个注解 指示到一个方法上即可。
格式:
@KafkaListener(topics = TOPIC_NAME)
public void someOne(String content){
....
}
我的示例:
@Component
public class MyKafkaConsumer {
private static final String TOPIC_NAME = "topic2";
@KafkaListener(topics = TOPIC_NAME)
public void processMessage(ConsumerRecord<String, String> record) {
System.out.println(String.format("# record: %s", record));
System.out.println(String.format("\t\t# 收到消息: %s", record.value()));
}
}
4. 扩展
Spring-kafka 的文件值得一下看:https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics
我的代码示例见:https://github.com/vir56k/java_demo/tree/master/kafka_demo1
5. 参考
Springboot 官网文档介绍
https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.spring-application
网友评论