1 下载kafka安装包
官方下载地址http://kafka.apache.org/downloads
使用镜像地址下载,https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
image.png2 安装kafka
上传到指定目录并解压(强迫症重命名了)
[root@localhost usr]# tar -zxvf kafka_2.13-2.7.0.tgz
[root@localhost usr]# mv kafka_2.13-2.7.0/ kafka
创建kafka数据目录和zk日志目录,默认放在/temp目录下个人感觉不安全
[root@localhost config]# cd /usr/kafka/
[root@localhost kafka]# mkdir data
修改kafka数据目录配置
[root@localhost kafka]# cd config/
[root@localhost config]# vim server.properties
将log.dirs=/tmp/kafka-logs
修改为log.dirs=../data
修改zk配置
[root@localhost config]# vim zookeeper.properties
将dataDir=/temp/zookeeper
修改为dataDir=../zookeeper
,端口可根据自己需要修改
ps:如果修改了默认的zk端口,server.properties中的zookeeper.connect=localhost:2181
也需要对应的修改
3 kafka的启动与关闭
3.1 启动zk
[root@localhost config]# cd /usr/kafka
[root@localhost kafka]# ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
[root@localhost kafka]# netstat -anpt | grep 2181
image.png
可以看到zk的2181已经开始被监听,zookeeper-server-start.sh
为zk的启动脚本,-daemon
参数为以守护进程的方式运行zk,./config/zookeeper.properties
指定zk启动时使用的配置文件
3.2 启动kafka
[root@localhost kafka]# ./bin/kafka-server-start.sh -daemon ./config/server.properties
image.png
./bin/kafka-server-start.sh
kafka的启动脚本,-daemon
守护进程方式运行,./config/server.properties
指定使用的配置
3.3 关闭kafka
[root@localhost kafka]# ./bin/kafka-server-stop.sh ./config/server.properties
3.4 关闭zk
[root@localhost kafka]# ./bin/zookeeper-server-stop.sh ./config/zookeeper.properties
为了避免每次敲命令,可以自己写成启动关闭脚本,懒人基操
4 测试kafka
4.1 创建topic
[root@localhost kafka]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
所有和主题相关的操作基本./bin/kafka-topics.sh
这个脚本,--create
创建操作,--zookeeper localhost:2181
指定zookeeper,--replication-factor
指定主题副本数,--partitions 1
指定主题分区数,--topic
指定主题名称
4.2 查看topic列表
[root@localhost kafka]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
--list
指定操作为查看主题列表,因为kafka的主题,副本,分片信息都保存在zookeeper中,所以查看主题列表的时候需要指定zookeeper
4.3 生产者发送消息
[root@localhost kafka]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
./bin/kafka-console-producer.sh
生产者相关的操作由此脚本执行,--broker-list
参数指定了所使用的broker,--topic
指定要往哪个主题发送消息
4.4 消费者接收消息
[root@localhost kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
./bin/kafka-console-consumer.sh
消费者相关的操作由此脚本执行,新版本的kafka使用--bootstrap-server
参数,0.8版本以前消费进度是直接写到 zookeeper 的,consumer 必须知道 zookeeper 的地址。这个方案有性能问题,0.9 的时候整体大改了一次,brokers 接管了消费进度,consumer 不再需要和 zookeeper 通信了。--topic
指定消费的主题,--from-beginning
表示从分区开头进行消费,即重新消费全部消息
可以看到消费者已经成功到消息了
成功消费消息.png
4.5 查看和删除topic
[root@localhost kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@localhost kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
网友评论