不多说,直接开整,首发在个人博客
环境背景:
服务器版本: CentOS Linux release 7.3.1611
Kafka版本: kafka_2.12-2.0.0 (最新稳定版)
java版本: java version "1.8.0_45"
服务器: (默认在所以节点执行,需要在各个节点执行会标注出来)
- node1: 10.241.0.10
- node2: 10.241.0.11
- node3: 10.241.0.12
hosts配置:
# cat /etc/hosts
10.241.0.10 node1
10.241.0.11 node2
10.241.0.12 node3
1. 安装java环境
1) 使用源码包安装java
# tar zxvf jdk-8u45-linux-x64.tar.gz
# mv jdk1.8.0_45/ /usr/local/jdk1.8.0_45
2) 配置环境变量
# cat >> /etc/profile << EOF
export JAVA_HOME=/usr/local/jdk1.8.0_45
export JRE_HOME=\${JAVA_HOME}/jre
export CLASSPATH=.:\${JAVA_HOME}/lib:\${JRE_HOME}/lib
export PATH=\${JAVA_HOME}/bin:\$PATH
EOF
3) 测试环境变量
# source /etc/profile
# java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
# env | grep jdk
JRE_HOME=/usr/local/jdk1.8.0_45/jre
PATH=/usr/local/jdk1.8.0_45/bin:/usr/local/jdk1.8.0_45/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin
JAVA_HOME=/usr/local/jdk1.8.0_45
CLASSPATH=.:/usr/local/jdk1.8.0_45/lib:/usr/local/jdk1.8.0_45/jre/lib
2. 下载安装包
1) 下载kafka (kafka里面包含zookeeper)
# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
3. 安装配置zookeeper
1) 解压和安装
# tar zxf kafka_2.12-2.0.0.tgz
# mv kafka_2.12-2.0.0 /usr/local/kafka
2) 重新生成配置文件
# cat > /usr/local/kafka/config/zookeeper.properties << EOF
syncLimit=5
initLimit=10
ticketTime=2000
clientPort=2181
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
server.1=10.241.0.10:2888:3888
server.2=10.241.0.11:2888:3888
server.3=10.241.0.12:2888:3888
EOF
3) 参数解释
ticketTime: 默认值是2000,以毫秒为单位.用来调节心跳和超时.
syncLimit: 默认值是5,tickTime值的5倍,配置leader和followers间心跳检测的最大延迟时间.
initLimit: 默认值是10, tickTime值的10倍,配置允许 followers 连接并同步到 leader 的最大时间.
clientPort: 服务器监听客户端连接ZooKeeper的端口
dataDir: 指定ZooKeeper存储内存数据库快照的目录,如不指定dataLogDir,那么事务日志也会存储到这个目录下.
dataLogDir: 指定ZooKeeper事务日志的存储目录.
maxClientCnxns: socket级别限制单个客户端与单台服务器之前的并发连接数量,通过IP地址来区分不同的客户端,默认值是 60,将其设置为0取消并发限制.
4) 创建数据目录
# mkdir -p /data/zookeeper/{data,logs}
5) 生成每个zookeeper节点的唯一标识文件
#node1-node3节点分别执行,要与配置文件中的server.[id]的id保持一致:
# echo 1 > /data/zookeeper/data/myid
# echo 2 > /data/zookeeper/data/myid
# echo 3 > /data/zookeeper/data/myid
6) 启动zookeeper(-daemon参数为后台启动,如不加则为前台启动)
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties
7) 测试zookeeper
#Mode分两种角色: leader和follower,
[root@squid ~]# telnet 10.241.0.10 2181
Trying 10.241.0.10...
Connected to 10.241.0.10.
Escape character is '^]'.
stat
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
Clients:
/10.241.0.1:41462[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x200000000
Mode: leader
Node count: 4
Proposal sizes last/min/max: -1/-1/-1
Connection closed by foreign host.
[root@squid ~]# telnet 10.241.0.12 2181
Trying 10.241.0.12...
Connected to 10.241.0.12.
Escape character is '^]'.
stat
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
Clients:
/10.241.0.1:39068[0](queued=0,recved=1,sent=0)
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x100000000
Mode: follower
Node count: 4
Connection closed by foreign host.
4. 配置kafka
1) 修改配置文件(只修改下面几个参数,其他保持默认不变)
- node1节点:
# cat /usr/local/kafka/config/server.properties
broker.id=1
log.dirs=/data/kafka/logs
zookeeper.connect=10.241.0.10:2181,10.241.0.11:2181,10.241.0.12:2181
- node2节点:
# cat /usr/local/kafka/config/server.properties
broker.id=2
log.dirs=/data/kafka/logs
zookeeper.connect=10.241.0.10:2181,10.241.0.11:2181,10.241.0.12:2181
- node3节点:
# cat /usr/local/kafka/config/server.properties
broker.id=3
log.dirs=/data/kafka/logs
zookeeper.connect=10.241.0.10:2181,10.241.0.11:2181,10.241.0.12:2181
2) 创建数据目录
# mkdir -p /data/kafka/logs
3) 启动kafka(-daemon参数为后台启动,如不加则为前台启动)
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# tail -f /usr/local/kafka/logs/kafkaServer.out
4) 创建topic
###创建topic (只写一个节点就可以,集群之间会自动同步)
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 10.241.0.10:2181 --replication-factor 3 --partitions 3 --topic topicTest
Created topic "topicTest".
###参数解释
--topic : 创建topic名为: topicTest
--replication-factor : 复制因子为3
--partitions : 5个分区
5) 查看现有的topic
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 10.241.0.10:2181,10.241.0.11:2181,10.241.0.12:2181
topicTest
6) 查看topic详细信息
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 10.241.0.10:2181,10.241.0.11:2181,10.241.0.12:2181 --topic topicTest
Topic:topicTest PartitionCount:3 ReplicationFactor:3 Configs:
Topic: topicTest Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: topicTest Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: topicTest Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1
#### 返回值解释
# Partition: 分区
# Leader : 负责读写指定分区的节点
# Replicas : 复制该分区log的节点列表
# Isr : 当前活跃的副本列表
7) 消息测试
#### 生产消息
[root@node1 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.241.0.10:9092,10.241.0.11:9092,10.241.0.12:9092 --topic ConsumerTest
>hello kafka And zookeeper
>My name is baiyongjie
>My blog address is baiyongjie.com
>Welcome to visit!
#### 消费消息
[root@node2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.241.0.10:9092,10.241.0.11:9092,10.241.0.12:9092 --topic ConsumerTest --from-beginning
hello kafka And zookeeper
My name is baiyongjie
My blog address is baiyongjie.com
Welcome to visit!
网友评论