美文网首页Kafka
kafka查询和修改topic的offset

kafka查询和修改topic的offset

作者: 逝水如歌 | 来源:发表于2018-10-31 17:22 被阅读0次
查询topic的offset的范围
用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2

输出

test:0:1288

查询offset的最大值:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1

输出

test:0:7885
从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]

设置consumer group的offset
启动zookeeper client

/zookeeper/bin/zkCli.sh

通过下面命令设置consumer group:testgroup topic:test partition:0的offset为1288:

set /consumers/testgroup/offsets/test/0 1288

注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

set /kafka/consumers/testgroup/offsets/test/0 1288

重启相关的应用程序,就可以从设置的offset开始读数据了。

手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:

zookeeper.connect=www.iteblog.com:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=group

这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

[iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
earliest config/consumer.properties iteblog
updating partition 0 with new offset: 276022922
updating partition 1 with new offset: 234360148
updating partition 2 with new offset: 157237157
updating partition 3 with new offset: 106968019
updating partition 4 with new offset: 80696130
updating partition 5 with new offset: 317144986
updating partition 6 with new offset: 299182459
updating partition 7 with new offset: 197012246
updating partition 8 with new offset: 230433681
updating partition 9 with new offset: 120971431
updating partition 10 with new offset: 51200673
updated the offset for 11 partitions

相关文章

  • kafka查询和修改topic的offset

    查询topic的offset的范围 用下面命令可以查询到topic:test broker:suna:9092的o...

  • kafka 常用命令

    Kafka常用topic操作命令汇总 offset topic consumer-group consumer p...

  • Flink kafka source源码解析(二)

    offset提交模式(非checkpoint) 消费kafka topic最为重要的部分就是对offset的管理,...

  • Kafka Offset Topic

    Kafka中有一个位移主题:__consumer_offsets。老版本的Kafka将位移保存在Zookeeper...

  • Kafka操作指令

    启动kafka 安全关闭kafka 创建topic 删除topic 查询topic 启动控制台Producer,向...

  • kafka包中的相关命令

    1.查看指定topic对应分区的offset 2.启动kafka 3. kafka相关命令 http://blog...

  • kafka中的topic、partition和offset

    一、简介 1. 什么是kafka Kafka 是一款分布式消息发布和订阅系统,具有高性能、高吞吐量的特点而被广泛应...

  • kafka基础

    一、常用shell命令 1. 列出所有topic 2. 查看kafka版本 3. 查看offset

  • Fetch Offset Range in Kafka with

    有的时候需要检出Kafka中某个topic的所有partition的offset range. 比如Spark S...

  • 修改kafka分区数

    这里记录下如何修改kafka的topic分区 查看当前topic分区,假设主题是testtopic 修改分区个数 ...

网友评论

    本文标题:kafka查询和修改topic的offset

    本文链接:https://www.haomeiwen.com/subject/qvoatqtx.html