一、Low-API操作:
kafka生产消息(--broker-list后跟kafka地址,注:不是zookeeper地址):
./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_co2
上述生产的消息key是空的,若想生产指定key的消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_co2 --property "parse.key=true" --property "key.separator=:"
如,需使用shell脚本生产一批带key的消息:
for (( i=1; i<=10; i++ )); do echo "key$$i:value$$i" | kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_co2 --property "parse.key=true" --property "key.separator=:"; done;
kafka消费消息(--zookeeper后跟zk地址):
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic safeclound_co2
kafka创建主题:
./kafka-topics.sh --zookeeper localhost:2181 --create --topic safeclound_co2 --partitions 10 --replication-factor 1
kafka查看主题列表:
./kafka-topics.sh --zookeeper localhost:2181 --list
kafka查看主题详情:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic safeclound_co2
kafka删除主题:
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic safeclound_co2
kafka添加分区:
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic safeclound_co2 --partitions 15
二、High-API操作:
查看主题的所有分区数据信息:
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -2 (-2表示最早offset)
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic safeclound_co2 --time -1 (-1表示最新offset)
查看group.id所有消费情况(offset):
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group safeclound_spark_test --describe
或者
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group safeclound_spark_test --topic safeclound_co2
三、使用auto.offset.reset的正确姿势:
若业务只需消费最新数据,将auto.offset.reset=latest,OK~~ 经亲生产环境血的经历,对不起,这样设置不生效!
那咋办呢? -----> 还得加上 enable.auto.commit=false
为什么还要加enable.auto.commit呢? 经各种baidu、Google了解到,因为如果enable.auto.commit=true会自动提交offset,虽然设置了auto.offset.reset=latest,但是它会优先使用已经存储的offset,因此auto.offset.reset=latest不生效!!! 其实仔细想想它这样实现也有道理!
注意:
1、以上使用的版本为kafka_0.10.x(在0.8.x~0.10.x之间存在offset是否存储于zk的区别)
2、Kafka目前不支持减少分区数和改变备份数。
参考资料
网友评论