1.先启动zookeeper
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
![](https://img.haomeiwen.com/i20425468/49206a7e9f15a28b.png)
2.再启动kafka服务
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
![](https://img.haomeiwen.com/i20425468/888f0ca04f305f89.png)
3.创建主题(topic)
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.91.128:2181 --replication-factor 1 --partitions 1 --topic 1707D
--bootstrap-server 指定zookeeper的地址端口 --replication-factor 备份 --partitions分区 --topic 主题
![](https://img.haomeiwen.com/i20425468/e45e993f8320ab2c.png)
(备注: 或者,您可以将代理配置为在发布不存在的主题时自动创建主题,而不是手动创建主题。)
查看主题: /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.91.128:2181
![](https://img.haomeiwen.com/i20425468/7bd4ce70e409a8fe.png)
删除主题:
(1) 在server.properties中增加设置,默认未开启 delete.topic.enable=true
(2) /opt/kafka/bin/kafka-topics.sh --delete --topic 1707D --zookeeper 192.168.91.128:2181
![](https://img.haomeiwen.com/i20425468/7f7c76d2746220ab.png)
4.再启动kafka的生成者(producer) ,发送消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.91.128:9092 --topic 1707D
![](https://img.haomeiwen.com/i20425468/fb6b070dd3f1f567.png)
5.最后启动kafka的消费者(consumer),接收消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.91.128:9092 --topic 1707D --from-beginning 从第一条开始接收
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.91.128:9092 --topic 1707D从现在生产者发送开始接收。
![](https://img.haomeiwen.com/i20425468/31a848167a8d0b82.png)
6.查看进程,杀死kafka进程
jps:查看zookeeper进程 (zookeeper是用java编写的)
ps:查看redis进程 (因为redis是用c语言编写的,所以不能直接查看
![](https://img.haomeiwen.com/i20425468/ecee993398b175ec.png)
网友评论