Api操作
1.生产者
1.1 创建
通过之前的介绍可知,生产者主要往kafka
中发布消息,因此在发送消息之前先创建topic
创建topic
命令如下:
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --create --topic test_topic --replication-factor 1 --partitions 1
关于命令解释如下:
kafka-topics.sh
topic
相关的脚本
--bootstrap-server
broker
的地址清单
--create
代表创建 topic
--topic test_topic
指定topic
的名字
--replication-factor
副本数
--partitions
指定分区数
1.2 查看
查询所有的topic
kafka-topics.sh --list --bootstrap-server 192.168.80.110:9092
![](https://img.haomeiwen.com/i18110702/67dea7ac8257cb01.png)
1.3 修改
修改topic
分区数
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --alter --topic test_topic --partitions 2
--alter
代表修改topic
注意:分区数只能增加,不能修改,如果非要修改则出现以下错误
![](https://img.haomeiwen.com/i18110702/bccca494119b8002.png)
修改副本数
单节点修改副本数没有意义,等到后面集群章节再来讨论
1.4 删除
删除topic
kafka-topics.sh --bootstrap-server 192.168.80.110:9092 --delete --topic test_topic
1.5 发布
kafka-console-producer.sh --bootstrap-server 192.168.80.110:9092 --topic my_topic
注意: my_topic之前没有创建,向my_topic发送消息会自动创建
topic
![](https://img.haomeiwen.com/i18110702/a60d25e79cbb6749.png)
1.6 监控
在之前的章节搭建了kafka-eagle
平台,在这个平台也可以监控一些kakfa
数据,登录该平台,就可以看到相对应消息
![](https://img.haomeiwen.com/i18110702/805a93c3cce3f318.png)
1.6.1 创建
选择Topics/create
也可以创建topic
,如下:
![](https://img.haomeiwen.com/i18110702/883b0a5d19f7823a.png)
1.6.2 查看
选择Topics/List
则可以查看所有的topic
,如下:
![](https://img.haomeiwen.com/i18110702/f80b9b1584cd99c0.png)
1.6.3 ksql
当然也可以选择Topics/KSQL
,通过ksql
语句来查看某个topic
里面的消息
![](https://img.haomeiwen.com/i18110702/679176662118eb96.png)
![](https://img.haomeiwen.com/i18110702/1147e55c2f4682f5.png)
关于KSQL
这里暂时不做太多阐述,比较简单,有兴趣可以翻阅官方文档
1.6.4 发布
可以选择Topics/Mock
给某个topic
发送消息
![](https://img.haomeiwen.com/i18110702/bc2fa1e0624474e7.png)
通过对比发现
kafka-eagle
去发布消息比起命令行更加容易
2. 消费者
2.1 消费
kafka-console-consumer.sh --bootstrap-server 192.168.80.110:9092 --from-beginning --group wangzh -topic my_topic
--from-beginning
代表从头开始消费
--group wangzh
代表消费者组
每个消费者都属于一个消费者组,一个消费者组可以拥有多个消费者,
一个消费者组对一个
topic
里面的消费消息只能读取一次
每个消息在partition
上都有一个自增长的序号,当消费者每次消费到消息就会记录消费的序号,当下次消费就会从序号开始消费而不是从头开始,这个需要称为偏移量
![](https://img.haomeiwen.com/i18110702/4c39119ac3c79519.png)
offset
(偏移量)保存在__consumer_offsets
中,这样当消费者因为某种原因断开与kafka
的连接,当下次消费时也不会从头消费
2.2 多个
当然也可以消费多个topic
的消息,而不一定是消费一个topic
消费之前先往my_topic
和test_topic
中发布一些消息,如下
![](https://img.haomeiwen.com/i18110702/6680f317a2f68c35.png)
![](https://img.haomeiwen.com/i18110702/4aa36533355b6b17.png)
消费多个topic
kafka-console-consumer.sh --bootstrap-server 192.168.80.110:9092 --from-beginning --group wangzh --whitelist "my_topic|test_topic"
![](https://img.haomeiwen.com/i18110702/d07cdba324ab89d6.png)
可以看到两个
topic
的消息都被消费出来了
2.3 查看
查看消费者组
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --list
查看消费者组偏移量
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --describe --group wangzh
![](https://img.haomeiwen.com/i18110702/1d1cbc3740c31052.png)
当然也可以在kafka-eagle
看到消费者组的信息,如下
![](https://img.haomeiwen.com/i18110702/99e1f4d52b5f4789.png)
甚至可以看到这个消费者组的偏移量,如下:
![](https://img.haomeiwen.com/i18110702/67cb3d91248e2b78.png)
2.4 删除
kafka-consumer-groups.sh --bootstrap-server 192.168.80.110:9092 --delete --group wangzh
消费组中当前没有消费者时才能删除
网友评论