topic
1. 查看topic
选项说明:
- --list :查看kafka所有的topic
- --bootstrap-server : 连接kafka集群
- --hadoop102:9092:hadoop102是指连接kafka任意一台机器,9092:kafka内部通信的端口
kafka-topics.sh --bootstrap-server hadoop102:9092 --list
2. 创建topic
选项说明:
- --topic : 定义topic名字
- --partitions : 定义分区数
- --replication-factor : 定义副本数
kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic first
--partitions 2 --replication-factor 2
3. 查看某个Topic的详情
选项说明:
- --topic first : 查看指定的话题,如果不加此选项,则表示查看所有的话题
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
4.修改分区数
说明:
- --分区数只能增加不能减少 ,要不然消减的分区数据不好处理
- 分区内部消息有序,分区之间消息无序
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 6
5. 发送消息
选项说明:
- hadoop102:9092,hadoop103:9092,hadoop104:9092 : kafka的集群中的broker,
其实写一个也是可以的,写3个的目的只是避免当连接的kafka集群broker故障时连不上kafka集群的情况。
kafka-console-producer.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092
--topic first
6. 消费消息
选项说明:
- --from-beginning :
加上:会把topic中以往所有的数据都读取出来
不加:此时只会消费最新的数据,原来topic中的数据不会被消费
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
7. 删除topic
kafka-topics.sh --bootstrap-server hadoop102:9820 --delete --topic first
8. 查看指定消费者组的消费topic 的位置情况
kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --describe
9. 设置指定消费者组的消费某个Topic 的偏移量
kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --topic lkp_test --reset-offsets --to-offset 1 --execute
##指定主题、分区、消费偏移
kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test:1 --to-offset 2 --execute
10. 查询topic的offset的范围
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2
输出
test:0:1288
查询offset的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop102:9092 -topic test --time -1
输出
test:0:7885
从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]
===========================================
Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。
在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。
更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
## Topic的作用域
–all-topics:为consumer group下所有topic的所有分区调整位移)
–topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
–topic t1:0,1,2:为指定的topic分区调整位移
## 重置策略
–to-earliest:把位移调整到分区当前最小位移
–to-latest:把位移调整到分区当前最新位移
–to-current:把位移调整到分区当前位移
–to-offset <offset>: 把位移调整到指定位移处
–shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
–to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file <file>:从CSV文件中读取调整策略
## 确定执行方案
什么参数都不加:只是打印出位移调整方案,不具体执行
–execute:执行真正的位移调整
–export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
## 注意事项
consumer group状态必须是inactive的,即不能是处于正在工作中的状态
不加执行方案,默认是只做打印操作
## 常用示例
更新到当前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
=======================================================
网友评论