1.下载指定版本的安装包
或者直接将下载完成的安装包上传到我们家目录下
★★★★★注意此时分多个版本:
1,测试的安装包必须是这个版本的(“kafka_2.11-0.10.2.0.tgz ”),
- 2.11是针对Scala 2.11版本
- 0.10.2.0是针对kafka本身的版本
2,测试版本是 kafka_2.11-2.4.1.tgz
- 2.11是针对Scala 2.11的版本
- 2.4.1 是针对kafka版本
所以对其进行了不同的测试
- 主机器(更改了主机名、hosts映射)
- 测试机器1:2.11-0.10.2.0 版本
- 测试机器2:2.11-2.4.1 版本
2.解压
tar -zxvf kafka_2.11-0.10.2.0.tgz -C /usr/local/
cd /usr/local
mv kafka_2.11-0.10.2.0/ kafka
# 我们在安装之前的时候可以创建一个新的账户(非必选)
# sudo chown -R hadoop ./kafka
3.测试
完全可以配置环境变量之后可以在任意目录下执行相应的代码,这里在测试的时候没有配置环境变量,所以一直需要先进去当前kafka的主目录下从而进行相应的操作。
(1).启动默认的 Zookeeper
这里为了方便,直接使用的是默认的zookeeper启动的,当然也可通过配置单机的 zookeeper来实现以下操作。
# 新建一个终端窗口
# 0.1,进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
(2.1),启动默认的 Kafka
# 0.2,新建一个终端窗口
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
(2.2),也可以修改Kafka的配置文件server.properties
# 确保每个机器上的id不一样
broker.id=0
# 配置服务端的监控地址
listeners=PLAINTEXT://192.168.51.128:9092
# kafka 日志目录
log.dirs=/data/servers/kafka_2.11-2.4.0/logs
# kafka设置的partitons的个数
num.partitions=1
# zookeeper的连接地址, 如果有自己的zookeeper集群, 请直接使用自己搭建的zookeeper集群,多个节点之间使用“,”隔开
zookeeper.connect=192.168.51.128:2181
(3).创建topic
kafka-topics.sh
:基本脚本命令命令参数详解:
--list
:查看topic
--create
: 创建Topic
--topic <String: topic>
:指定 Topic(主题) 名,
--partitions
:指定该Topic分区的个数,
--replication-factor
:指定每个Partition的备份数
--zookeeper <String: host1:port,host2:port>
:指定 Zookeeper 的连接地址参数提示可能并不赞成这样使用,那么就可以直接指定Kafka的连接地址(DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.)
--bootstrap-server<String: host1:port,host2:port>
:指定kafka的连接信息,采用Direct方式的时候推荐使用这个。
# 0.3,查看当前集群中的topic,在创建之前和之后都可以进行查看,根据自己的实际需要进行相应的修改
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 新建一个终端窗口
# 1.1,主机器(也是就我最近常用的机器,主机名和hosts映射都重新做的)
# 创建一个名字为demo01的Topic,只有1个分区、每个Partition只有1个备份
# 这里是使用的Zookeeper作为连接
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic demo01
# 2.1,测试机器1(也就 第一个版本的测试,第二个版本的测试也是一样的)
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo01
# 创建 topic test1
# 存在的价值:1.在指定连接信息的时候可以采用直连(Direct)的方式,也就是说使用配置的Kafka的地址和端口;2.指定集群中的多个地址的时候可以使用","进行隔开;3.这里使用的ip地址指定的,也可以使用主机名的方式指定,但是前提要配置hosts映射
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
(4).创建Producer
kafka-console-producer.sh
基本脚本命令命令参数详解:
--broker-list <String: broker-list>
:指定kafka的连接信息,集群中多个连接信息之间使用","隔开
--topic <String: topic>
:指定Topic
--timeout <Integer: timeout_ms>
:设置超时时间
--sync
:异步发送信息
# 1.2,常用机器
# 在当前topic窗口执行以下命令来创建Producer
bin/kafka-console-producer.sh --broker-list master:9092 --topic demo01
# 此时可以在命令下边输入消息内容,每一行为一条消息,此时可以将消息推送到kafka服务器,以在consumer中进行查看
# 2.2,测试机器1和2
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo01
# 并在当前窗口运行成功后输入以下信息
hello hadoop
hello wyq
(5).创建Consumer
kafka-console-consumer.sh
:基本脚本命令命令参数详解:
--bootstrap-server
:指定kafka的连接信息,集群中多台机器采用","进行分割
--topic <String: topic>
:指定消费的Topic
--from-beginning
: 是用来指定消费信息是从头开始的。
--group <String: consumer group id>
:指定消费者组
# 新建一个终端窗口
# 1.3.1,常用机器
# 创建消费者,查看消费情况(此时为方法1),此时是通过间接访问 zookeeper的2181端口消费数据
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper master:2181 --topic demo01 --from-beginning
# 1.3.2,也可以用方法2(说明: 因为此时的版本是旧的,会提示--zookeeper将会被取代,然后我们就可以使用--bootstrap-server去实现,但是指定的连接端口(此时是通过kafka的端口9092消费数据)不一样,而是直接指定连接Producer的端口)
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic demo01 --new-consumer --from-beginning
# 测试机器1(也就是 2.11-0.10.2.0 版本)
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo01 --from-beginning
# 此时可以看到我们刚刚生产的三条消息,说明kafka安装成功
# 测试机器2(也就是 2.11-2.4.1 版本)
# 注意此时版本中将之前的"--new-consumer"参数取消了,所以可以直接去掉
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo01 --from-beginning
4.关闭所有程序的顺序
原则上是:按照启动顺序倒着关!!!
- 1.关闭 Consumer
- 2.关闭 Producer
- 3.关闭 Kafka
- 4.关闭 zookeeper
5.附加操作
(1).后台常驻的方式启动kafka,带上参数 -daemon
(2).停止kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/bin/config/server.properties
(3),查询Topic
1).查看指定的Topic的详细信息
--describe
配合--topic
参数查看指定Topic的详细信息,同时要指定--zookeeper
的连接信息
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
2).查看 topic 指定分区 offset 的最大值或最小值
time 为 -1 时表示最大值,为 -2 时表示最小值:
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0
(4),增加Topic的Partition数量
/usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5
(5),删除Topic
--delete
配合--topic
参数指定要删除的Topic,同时要指定--zookeeper
的连接信息如果要实现真正意义上的删除,必选将配置文件中
delete.topic.enable=true
这个配置项进行更改;因为会提示以下的信息Topic demo02 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 1.需要事先将config/server.properties
delete.topic.enable=true
# 2.执行删除操作
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(6).消费信息
1).指定从头开始
--from-beginning
关键字的使用!
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2),从尾部获取数据,此时必须指定分区
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
3).指定获取的个数的信息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1
网友评论