美文网首页Kafka
kafka查询和修改topic的offset

kafka查询和修改topic的offset

作者: 逝水如歌 | 来源:发表于2018-10-31 17:22 被阅读0次
    查询topic的offset的范围
    用下面命令可以查询到topic:test broker:suna:9092的offset的最小值:

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -2

    输出

    test:0:1288

    查询offset的最大值:

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list suna:9092 -topic test --time -1

    输出

    test:0:7885
    从上面的输出可以看出topic:test只有一个partition:0 offset范围为:[1288,7885]

    设置consumer group的offset
    启动zookeeper client

    /zookeeper/bin/zkCli.sh

    通过下面命令设置consumer group:testgroup topic:test partition:0的offset为1288:

    set /consumers/testgroup/offsets/test/0 1288

    注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

    set /kafka/consumers/testgroup/offsets/test/0 1288

    重启相关的应用程序,就可以从设置的offset开始读数据了。

    手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

    [iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

    USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

    在不输入参数的情况下,我们可以得知kafka.tools.UpdateOffsetsInZK类需要输入的参数。我们的consumer.properties文件配置内容如下:

    zookeeper.connect=www.iteblog.com:2181
    # timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    #consumer group id
    group.id=group

    这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

    [iteblog@www.iteblog.com ~]$ bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
    earliest config/consumer.properties iteblog
    updating partition 0 with new offset: 276022922
    updating partition 1 with new offset: 234360148
    updating partition 2 with new offset: 157237157
    updating partition 3 with new offset: 106968019
    updating partition 4 with new offset: 80696130
    updating partition 5 with new offset: 317144986
    updating partition 6 with new offset: 299182459
    updating partition 7 with new offset: 197012246
    updating partition 8 with new offset: 230433681
    updating partition 9 with new offset: 120971431
    updating partition 10 with new offset: 51200673
    updated the offset for 11 partitions

    相关文章

      网友评论

        本文标题:kafka查询和修改topic的offset

        本文链接:https://www.haomeiwen.com/subject/qvoatqtx.html