0.环境准备与参考资料
(1).部署前提
1),部署kafka是依赖zookeeper的,所以需要事先配置zookeeper.
2),查看进程需用使用jps,所以要先部署java环境
(2).参考资料
1),[kafka简介]
2),kafka集群搭建
1.解压
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local/
2.修改配置文件
(1),对于Kafka,一个broker仅仅只是一个集群的大小,所有我们多可以设置多个broker。
(2),说明
broker.id是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。
vi /usr/local/kafka/config/server.properties
# (1).指定代理id,borker.id可以任意指定,前提是保证集群内每台机器的broker.id唯一,第二台机器设置为2...以此类推,依次设置每一台机器的id值
broker.id=1
# 在其他两台机器(master2,slave)中修改
broker.id=2
broker.id=3
# (2).kafka数据的存放目录,而非Kafka的日志目录(最好三台机器中都是不一样的)
log.dirs=/tmp/kafka-logs-1
# 在其他两台机器(master2,slave)中修改
log.dirs=/tmp/kafka-logs-2
log.dirs=/tmp/kafka-logs-3
# (3).设置zookeeper集群地址
zookeeper.connect=master:2181,master2:2181,slave:2181
# (4).设置本机地址,设置为本服务器的ip地址(依次设置每一台机器的地址)。如果不设置会在创建主题和发送消息时,发生NOT LEADER FOR PARTITION异常。
host.name=master
# 在其他两台机器(master2,slave)中修改
host.name=master2
host.name=slave
# (5).修改端口
listeners=PLAINTEXT://:9092
# 在其他两台机器(master2,slave)中修改
listeners=PLAINTEXT://:9093
listeners=PLAINTEXT://:9094
3.修改环境变量
vi .bash_profile
# (1).追加以下内容
export JAVA_HOME=/usr/local/jdk
export ZOOKEEPER_HOME=/usr/local/zookeeper
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
# (2).修改完环境变量之后需要立即生效
source .bash_profile
# (3).将修改的配置文件将配置文件同步到其他两台服务器中
scp .bash_profile master2:/root
scp .bash_profile slave:/root
# (4).在同步到其他两台服务器中需要立即生效配置文件
source .bash_profile
4.启动服务
需要在Zookeeper已经启动的情况下启动
(1).在一个节点中启动
# 启动kafka
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
(2).在另外两个新的kafka节点启动
# 我们已经运行了zookeeper和刚才的一个kafka节点,所有我们只需要在启动2个新的kafka节点(在master2和slave节点)。
# 1.master2节点
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 2.slave节点
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
(3).关闭kafka
# (1),关闭master节点的kafka
kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties
# (2),关闭master2节点和slave节点的kafka
kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties
kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties
5.创建Topic
(1).创建一个名为"test"的Topic,只有一个分区和备份
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
# (1.1),执行结果
Created topic "test".
# (1.2),查看已经创建好的topic信息
[root@master ~]# kafka-topics.sh --list --zookeeper master:2181
# (1.3),执行结果
test
(2),创建一个新topic,把备份设置为:3
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# (2.1).执行结果
Created topic "my-replicated-topic".
# (2.2),查看每个集群节点在做什么
kafka-topics.sh --describe --zookeeper master:2181 --topic my-replicated-topic
# (2.3),执行结果
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
# (2.4),解读
输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。
"leader":该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
"replicas":备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
"isr":“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
(3).删除topic
kafka-topics.sh --delete --topic test --zookeeper master:2181
6.发送消息
(1).向"test"Topic发送信息
# Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
# 运行producer(生产者),然后在控制台输入几条消息到服务器。
kafka-console-producer.sh --broker-list master:9092 --topic test
> # 在此处输入消息信息
(2).向"my-replicated-topic"Topic发送信息
kafka-console-producer.sh --broker-list master:9092 --topic my-replicated-topic
> # 在此处输入消息信息
7.消费信息
(1).从"test"Topic拉取信息
# Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --from-beginning
# 在这里会显示在发送端发送的信息;如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
(2).从"my-replicated-topic"Topic拉取信息
kafka-console-consumer.sh --zookeeper master2:2181 --from-beginning --topic my-replicated-topic
# 执行结果
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
网友评论