美文网首页
kafka shell命令操作

kafka shell命令操作

作者: 无来无去_A | 来源:发表于2020-08-05 17:53 被阅读0次

    topic

    1. 查看topic
    选项说明:
    - --list :查看kafka所有的topic
    - --bootstrap-server : 连接kafka集群
    - --hadoop102:9092:hadoop102是指连接kafka任意一台机器,9092:kafka内部通信的端口
    kafka-topics.sh  --bootstrap-server hadoop102:9092 --list
    
    2. 创建topic
    选项说明:
    
    - --topic  : 定义topic名字
    - --partitions  : 定义分区数 
    - --replication-factor : 定义副本数
    kafka-topics.sh  --bootstrap-server hadoop102:9092 --create --topic first 
    --partitions 2 --replication-factor 2
    
    3. 查看某个Topic的详情
    选项说明:
    - --topic first : 查看指定的话题,如果不加此选项,则表示查看所有的话题
    kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
    
    4.修改分区数
    说明:
    
    - --分区数只能增加不能减少 ,要不然消减的分区数据不好处理
    - 分区内部消息有序,分区之间消息无序
    kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 6
    
    5. 发送消息
    选项说明:
    
    - hadoop102:9092,hadoop103:9092,hadoop104:9092 : kafka的集群中的broker,
    其实写一个也是可以的,写3个的目的只是避免当连接的kafka集群broker故障时连不上kafka集群的情况。
    
    kafka-console-producer.sh --broker-list  hadoop102:9092,hadoop103:9092,hadoop104:9092 
    --topic first
    
    6. 消费消息
    
    选项说明:
    
    - --from-beginning :
    
      加上:会把topic中以往所有的数据都读取出来
    
      不加:此时只会消费最新的数据,原来topic中的数据不会被消费
    kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
    
    7. 删除topic
    kafka-topics.sh --bootstrap-server hadoop102:9820 --delete --topic first
    
    8. 查看指定消费者组的消费topic 的位置情况
    kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --describe
    
    9. 设置指定消费者组的消费某个Topic 的偏移量
    kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --group test --topic lkp_test --reset-offsets --to-offset 1 --execute
    
    ##指定主题、分区、消费偏移
    kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test:1 --to-offset 2 --execute
    
    
    10. 查询topic的offset的范围
    用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:
    
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2
    输出
    
    test:0:1288
    查询offset的最大值:
    
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop102:9092 -topic test --time -1
    输出
    
    test:0:7885
    从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]
    
    ===========================================
    Kafka Version为0.11.0.0之后,Consumer的Offset信息不再默认保存在Zookeeper上,而是选择用Topic的形式保存下来。
    
    在命令行中可以使用kafka-consumer-groups的脚本实现Offset的相关操作。
    
    更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。
    
    ## Topic的作用域
    
    –all-topics:为consumer group下所有topic的所有分区调整位移)
    –topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
    –topic t1:0,1,2:为指定的topic分区调整位移
    
    ## 重置策略
    
    –to-earliest:把位移调整到分区当前最小位移
    –to-latest:把位移调整到分区当前最新位移
    –to-current:把位移调整到分区当前位移
    –to-offset <offset>: 把位移调整到指定位移处
    –shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
    –to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
    –by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
    –from-file <file>:从CSV文件中读取调整策略
    
    ## 确定执行方案
    
    什么参数都不加:只是打印出位移调整方案,不具体执行
    –execute:执行真正的位移调整
    –export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用
    
    ## 注意事项
    
    consumer group状态必须是inactive的,即不能是处于正在工作中的状态
    不加执行方案,默认是只做打印操作
    
    ## 常用示例
    
    更新到当前group最初的offset位置
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
    
    更新到指定的offset位置
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
    
    更新到当前offset位置(解决offset的异常)
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
    
    offset位置按设置的值进行位移
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
    
    offset设置到指定时刻开始
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
    =======================================================
    

    相关文章

      网友评论

          本文标题:kafka shell命令操作

          本文链接:https://www.haomeiwen.com/subject/mtjorktx.html