参考:
https://www.cnblogs.com/honeybee/p/5258906.html
创建:
step1 启动zookeeper服务
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.out &
step2 启动kafka服务
nohup bin/kafka-server-start.sh config/server.properties &
nohup bin/kafka-server-start.sh config/server.properties > kafka_server.out &
step3 创建一个kafka消息队列
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2
注:
--partitions 1 为分区数,为分区数,0.8.1默认就是2
--replication-factor 3 设置是topic消息的备份份数为3份,即副本数
创建消息队列,并指定topic的配置参数(消息的最大值参数max.message.bytes )
kafka-topics.sh --create --zookeeper $ZK_CONNECT --replication-factor 2 --partitions 200 --topic cdn_gz_data --config max.message. bytes=10485760 --config flush.message=1
生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费
bin/kafka-console-consumer.sh -zookeeper **** --topic ***
查看topic进度
查看topic消费进度
-
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list **** --topic **** --time
-
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker已经不支持了。你应该使用kafka.admin.ConsumerGroupCommand(或bin/kafka-consumer-groups.sh脚本)来管理消费者组,包括用新消费者API创建的消费者。
- ./kafka-consumer-groups.sh --zookeeper
网友评论