kafka
#解压,创建软链接
cd /home/hadoop/app/
tar -zxvf kafka_2.11-0.9.0.1.tgz
ln -s kafka_2.11-0.9.0.1 kafka
rm -rf kafka_2.11-0.9.0.1.tgz #删除安装文件,节省空间
#修改配置文件
cd kafka/config/
vim server.properties
broker.id=1 #每个机器上不同,如(broker.id=2, broker.id=3)
log.dirs=/export/servers/log/kafka #是kafka的日志文件目录,存放的是消息数据,以主题命名的分区
zookeeper.connect=zk03:2181,zk02:2181,zk01:2181
#################################################
#分发到其他机器
mkdir /export/servers/log/kafka -p
scp -rp ./kafka hdp-node-02:/home/hadoop/app/
scp -rp ./kafka hdp-node-03:/home/hadoop/app/
#启动
#依次在各节点上启动kafka
bin/kafka-server-start.sh config/server.properties
#or
/usr/hadoop/kafka_2.11-0.10.2.0/bin/kafka-server-start.sh -daemon /usr/hadoop/kafka_2.11-0.10.2.0/config/server.properties
#后台启动
kafka-server-start.sh config/server.properties &
查看当前服务器中的所有topic
bin/kafka-topics.sh --list --zookeeper zk01:2181
创建topic
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 3 --topic first
删除topic
bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
通过shell命令发送消息
kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima
通过shell消费消息
bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1
写--from-beginning会显示历史消息,如果只想显示最新的可以不写
查看消费位置
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
查看某个Topic的详情
kafka-topics.sh --topic test --describe --zookeeper zk01:2181
停止服务
bin/kafka-server-stop.sh
添加分区
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3 sh bin./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest
查看topic的偏移
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic kafkatest2 --time -1 sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic kafkatest2 --time -2
用ConsumerGroupCommand工具,我们可以使用list,describe,或delete消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 例如,要列出所有主题中的所有用户组:
查看所有的消费者组
bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
monitorwebsitegroup
syslog
ntaflowgroup
aptgroup
网友评论