consumer group重新设置位移
前提是 consumer group不 能处于运行状态,也就是说它必须是 inactive 的
第一步是确定消费者组下 topic的作用域,当前支持 3种作用域,它们分别如下。
--all-topics:为消费者组下所有 topic 的所有分区调整位移 。
--topic t1,--topic t2:为指定的若干个 topic 的所有分区调整位移 。
--topic t1:0,1,2:为 topic的指定分区调整位移。
确定了 topic作用域之后,第二步就是确定位移重设策略 。当前支持如下 8种设置规则 。
--to-earliest:把位移调整到分区当前最早位移处 。
--to-latest:把位移调整到分区当前最新位移处。
--to-current:把位移调整到分区当前位移处 。
--to-offset <offset>:把位移调整到指定位移处。
--shift-by N:把位移调整到当前位移+N处。 N可以是负值。
--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处 。datatime 格式是yyyy-MM-ddTHH:mm:ss.xxx,比如 20l7-08-04T00:00:00.000。
--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处 。duration格式 是 PnDTnHnMnS,比如 PT0H5M0S。
--from-file <file>:从csv文件中读取位置调整策略
最后一步是确定执行方案,当前支持如下 3种方案 。
不加任何参数:只是打印位移调整方案,不实际执行 。
--execute:执行真正的位移调整。
--export:把位移调整方案保存成 csv 格式并输出到控制台,方便用户保存成 csv 文件,供后续结合from-file参数使用 。
测试
终端下启动 一个 console consumer程序,组名设置为 test-group:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal-example --from-beginning --consumer-property group.id=test-group
待运行一段时间后按下 Ctrl+C 组合键关闭 console consumer 程序,将 test-group 设置为inactive状态 。 之后运行 kafka-consumer-groups.sh脚本,确定当前 group 的消费进度:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
最早:--to-earliest
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --topic canal-example --to-earliest --execute
大于指定时间:--to-datetime
bin/kafka- consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2018-04-01T14:30:00. 000
获取 topic 当前消息数
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
bin/kafka-run-class .sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
--time -1 命令表示要获取指定 topic所有分区当前的最大位移:而一time -2表示获取当前最早位移。随着集群的不断运行,topic 的数据可能会被移除 一部分 ,因--time 斗的结果其实表示的是历史上该 topic 生产的最大消息数。如果用户要统计当前的消息总数就必须减去 --time -2 的结果 。
网友评论