美文网首页
kafka shell命令操作

kafka shell命令操作

作者: 无来无去_A | 来源:发表于2020-08-05 17:53 被阅读0次

topic

1. 查看topic
选项说明:
- --list :查看kafka所有的topic
- --bootstrap-server : 连接kafka集群
- --hadoop102:9092:hadoop102是指连接kafka任意一台机器,9092:kafka内部通信的端口
kafka-topics.sh  --bootstrap-server hadoop102:9092 --list

2. 创建topic
选项说明:

- --topic  : 定义topic名字
- --partitions  : 定义分区数 
- --replication-factor : 定义副本数
kafka-topics.sh  --bootstrap-server hadoop102:9092 --create --topic first 
--partitions 2 --replication-factor 2

3. 查看某个Topic的详情
选项说明:
- --topic first : 查看指定的话题,如果不加此选项,则表示查看所有的话题
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first

4.修改分区数
说明:

- --分区数只能增加不能减少 ,要不然消减的分区数据不好处理
- 分区内部消息有序,分区之间消息无序
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 6

5. 发送消息
选项说明:

- hadoop102:9092,hadoop103:9092,hadoop104:9092 : kafka的集群中的broker,
其实写一个也是可以的,写3个的目的只是避免当连接的kafka集群broker故障时连不上kafka集群的情况。

kafka-console-producer.sh --broker-list  hadoop102:9092,hadoop103:9092,hadoop104:9092 
--topic first

6. 消费消息

选项说明:

- --from-beginning :

  加上:会把topic中以往所有的数据都读取出来

  不加:此时只会消费最新的数据,原来topic中的数据不会被消费
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

7. 删除topic
kafka-topics.sh --bootstrap-server hadoop102:9820 --delete --topic first

8. 查看指定消费者组的消费topic 的位置情况
kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --describe

9. 设置指定消费者组的消费某个Topic 的偏移量
kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --topic lkp_test --reset-offsets --to-offset 1 --execute

##指定主题、分区、消费偏移
kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test:1 --to-offset 2 --execute


10. 查询topic的offset的范围
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2
输出

test:0:1288
查询offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop102:9092 -topic test --time -1
输出

test:0:7885
从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]

===========================================
Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。

在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。

更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。

## Topic的作用域

–all-topics:为consumer group下所有topic的所有分区调整位移)
–topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
–topic t1:0,1,2:为指定的topic分区调整位移

## 重置策略

–to-earliest:把位移调整到分区当前最小位移
–to-latest:把位移调整到分区当前最新位移
–to-current:把位移调整到分区当前位移
–to-offset <offset>: 把位移调整到指定位移处
–shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
–to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file <file>:从CSV文件中读取调整策略

## 确定执行方案

什么参数都不加:只是打印出位移调整方案,不具体执行
–execute:执行真正的位移调整
–export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

## 注意事项

consumer group状态必须是inactive的,即不能是处于正在工作中的状态
不加执行方案,默认是只做打印操作

## 常用示例

更新到当前group最初的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute

更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute

offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute

offset设置到指定时刻开始
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
=======================================================

相关文章

网友评论

      本文标题:kafka shell命令操作

      本文链接:https://www.haomeiwen.com/subject/mtjorktx.html