Kafka是一个分布式流媒体平台。发布和订阅记录流,类似于消息队列或企业消息传递系统。以容错持久的方式存储记录流。处理记录发生的流。本文讲述在三台主机上安装kafka集群的主要步骤
主要内容:
- 1.启动Zookeeper
- 2.安装Kafka
- 3.测试
- 4.其它
- 5.一键脚本
集群规划如下:
用户 | 主机名 | ip | 进程 |
---|---|---|---|
hadoop | hadoop1 | 192.168.2.111 | Zookeeper、Kafka |
hadoop | hadoop2 | 192.168.2.112 | Zookeeper、Kafka |
hadoop | hadoop3 | 192.168.2.113 | Zookeeper、Kafka |
1.启动Zookeeper
之前Zookeeper集群安装已经安装好了,现在只需要启动即可
可以使用脚本批量启动,也可以多窗口运行命令启动,启动后的状态如下:
使用客户端创建一个/kafka目录来存放kafka相关的配置文件
./bin/zkCli.sh
创建kafka节点来存放kafka的配置文件
create /kafka ''
1.2.安装Kafka
1.2.1.下载
下载地址:传送们
根据自己的Scala版本下载相应的kafka版本即可,如果没有,就自己编译
1.2.2.上传解压
tar -zxvf kafka_2.11-1.0.1.tgz -C /opt/soft
1.2.3.配置启动
1、修改config/server.properties文件如下:
#必须 设置broker.id(从0开始,3个节点分别设为0,1,2,不能重复)
broker.id=0
#可选 用来监听链接的端口,producer 或 consumer 将在此端口建立连接
port=9092
#可选 日志文件目录
log.dirs=/opt/soft/kafka_2.11-1.0.1/kafka-logs
#可选 在当前 broker 上的partition数量
num.partitions=1
#必须 Zookeeper服务器
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的根 "/" 路径,这样有关Kafka的配置就会散落在ZooKeeper根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个路径,这就是我们为什么用zookeeper客户端创建/kafka节点的原因,然后直接在zookeeper.connect配置项中指定:
zookeeper.connect= node2:2181,node3:2181,node4:2181/kafka
2、将kafka_2.11-1.0.1拷贝到其它主机(hadoop2、hadoop3)
scp -r kafka_2.11-1.0.1/ hadoop@hadoop2:/opt/soft
scp -r kafka_2.11-1.0.1/ hadoop@hadoop3:/opt/soft
3、修改其它主机(hadoop2、hadoop3)的broker.id
broker.id=1
broker.id=2
4、启动
可以编写一键启动脚本,也可以批量窗口操作,运行如下命令:
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
5、查看是否启动成功
jps
image.png
如图,每台机器都应该有如上图的2个进程
3.测试
3.1.创建、查看、生产、消费
# 创建一个叫做“TEST”的topic,它有3个分区,3个副本
./bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic TEST --partitions 3 --replication-factor 3
# 查看Topics
./bin/kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
# 创建一个生成者往TEST写数据
./bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic TEST
# 创建一个消费者从头开始消费
./bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic TEST --from-beginning
生产者
Topic名为TEST,有3个Partition ,有3个副本
编号为0的Partition,Leader在broker.id=1这个节点上,负责该Partition的读写,副本在broker.id为1、0、2这个三个节点上,Isr表示所有存活的副本,并跟broker.id=1这个节点同步
3.2.删除
1、删除topic
./bin/kafka-topics.sh --delete --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic TEST
4.其它
- 如遇到问题可以在前面设置的日志文件夹log.dirs里查看启动日志
- 查看Zookeeper 下 /kafka里的配置信息
ls /kafka/brokers/ids
image.png
5.一键脚本
设置环境变量(三台主机)
export KAFKA_HOME=/opt/soft/kafka_2.11-1.0.1/
export PATH=$PATH:$KAFKA_HOME/bin
startkafka.sh 一键启动脚本(设置Kafka的环境变量)
cat ./slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;nohup kafka-server-start.sh /opt/soft/kafka_2.11-1.0.1/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
stopkafka.sh 一键停止脚本(设置Kafka的环境变量)
cat ./slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -c 1-4 |xargs kill -s 9 "
}&
wait
done
slave
hadoop1
hadoop2
hadoop3
网友评论