美文网首页
Kafka集群搭建

Kafka集群搭建

作者: 鋆坤 | 来源:发表于2020-07-10 19:09 被阅读0次

Kafka之分布式消息队列

一、简介

Kafka官网

二、生产集群搭建

准备

  • 安装JDK的linux服务器三台(JDK版需要与Kafka版本对应好)
  • ​Zookeeper集群

下载

准备好kafka安装包,官网下载地址

创建目录

cd /opt
mkdir kafka       #创建安装目录
cd kafka
mkdir kafkalogs   #创建kafka消息目录,主要存放kafka消息

解压

tar -zxvf kafka_2.11-1.0.0.tgz -C /opt/kafka/

修改配置

cd /opt/kafka/kafka_2.11-1.0.0/config/
vim server.properties
broker.id=0  #集群内全局唯一,每台服务器设置不同的值
listeners=PLAINTEXT://192.168.xx.xx:9092 #这个IP地址也是与本机相关的,每台服务器上设置为自己的IP地址
log.dirs=/opt/kafka/kafkalogs #存放kafka消息
zookeeper.connect=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 #zookeeper集群

拷贝安装目录

分发kafka目录到集群所有服务器,同时修改每个文件的broker.id和listeners

启动

#先启动zookeeper集群
bin/kafka-server-start.sh -daemon config/server.properties

测试

创建主题 
bin/kafka-topics.sh --create --bootstrap-server 192.168.xx.xx:9092 --replication-factor 3 --partitions 3 --topic test01

查看主题列表
bin/kafka-topics.sh --list --bootstrap-server 192.168.xx.xx:9092
        
启动控制台生产者
bin/kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test01

启动控制台消费者        
bin/kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test01 --from-beginning

在生产者控制台输入hello kafka

配置详解

broker.id=0
#每个 broker 都可以用一个唯一的非负整数 id 进行标识; 这个 id 可以作为 broker 的“名字”, 并且它的存在使得 broker 无须混淆 consumers
#就可以迁移到不同的 host/port 上。 你可以选择任意你喜欢的数字作为 id, 只要 id 是唯一的即可

listeners=PLAINTEXT://192.168.xx.xx:9092
#Kafka服务地址

#advertised.listeners=PLAINTEXT://your.host.name:9092
#Kafka注册到Zookeeper的地址,内网不用设置默认使用listeners,
#内外网环境
#listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
#listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://192.168.xx.xx:9093
#advertised.listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://<公网ip>:<端口>
#inter.broker.listener.name=INTERNAL

num.network.threads=3
# broker 处理消息的最大线程数,一般情况下不需要去修改

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

socket.send.buffer.bytes=102400
#SO_SNDBUFF 缓存大小, server 进行 socket连接所用

socket.receive.buffer.bytes=102400
#SO_RCVBUFF 缓存大小, server 进行 socket连接时所用

socket.request.max.bytes=104857600
#server允许的最大请求尺寸; 这将避免server溢出, 它应该小于 Java heap size

log.dirs=/opt/kafka/kafkalogs
#kafka 存放数据的路径。这个路径并不是唯一的,可以是多个, 路径之间只需要使用逗号分隔即可; 
#每当创建新 partition 时, 都会选择在包含最少 partitions 的路径下进行。

num.partitions=3
#如果创建 topic 时没有给出划分 partitions 个数, 这个数字将是 topic 下 partitions 数目的默认数值。

num.recovery.threads.per.data.dir=1
#每个数据目录用来日志恢复的线程数目

offsets.topic.replication.factor=3
#topic 的 offset 的备份份数。 建议设置更高的数字保证更高的可用性
transaction.state.log.replication.factor=3
#事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求。
transaction.state.log.min.isr=3
#覆盖事务主题的min.insync.replicas配置。


#log.flush.interval.messages=10000
#此项配置指定时间间隔: 强制进行 fsync日志。 例如, 如果这个选项设置为 1, 那么每条消息之后都需要进行 fsync, 
#如果设置为 5, 则每 5 条消息就需要进行一次fsync。 一般来说, 建议你不要设置这个值。 

#log.flush.interval.ms=1000
#此项配置用来置顶强制进行 fsync 日志到磁盘的时间间隔; 例如, 如果设置为1000, 那么每 1000ms 就需要进行一次fsync。 一般不建议使用这选项

log.retention.hours=168
#log.retention.bytes=1073741824
#每个日志文件删除之前保存的时间。 默认数据保存时间对所有 topic 都一样。
#log.retention.hours和log.retention.bytes 都是用来设置删除日志文件的, 无论哪个属性已经溢出。
#这个属性设置可以在 topic 基本设置时进行覆盖。

log.segment.bytes=1073741824
#topic partition 的日志存放在某个目录下诸多文件中, 这些文件将 partition 的日志切分成一段一段的; 这个属性就是每个文件的最大尺寸;
#当尺寸达到这个数值时, 就会创建新文件。 此设置可以由每个 topic 基础设置时进行覆盖。

log.retention.check.interval.ms=300000
#检查日志分段文件的间隔时间, 以确定是否文件属性到达删除要求

zookeeper.connect=10.160.22.1:2181,10.160.22.2:2181,10.160.22.3:2181
#zookeeper集群

zookeeper.connection.timeout.ms=6000
#客户端等待和 zookeeper 建立连接的最大时间

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
#主要作用是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。

三、Zookeeper存储结构详解

kafka 元数据存储目录

admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)
brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息
cluster: 存储 kafka 集群信息
config: 存储 broker,client,topic,user 以及 changer 相关的配置信息
consumers: 存放消费者相关信息 (一般为空)
controller: 用于存放控制节点信息 (注意:该节点是一个临时节点,用于 controller 节点注册)
controller_epoch: 用于存放 controller 节点当前的年龄
isr_change_notification: 用于存储 isr 的变更通知 (临时节点,当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更)
latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围
log_dir_event_notification: 日志目录事件通知

admin 目录结构

[zk: 127.0.0.1:2181(CONNECTED) 2] ls /kafka/admin
[delete_topics]
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka/admin/delete_topics
[]
[zk: 127.0.0.1:2181(CONNECTED) 4] get  /kafka/admin/delete_topics
null

brokers 目录结构

# broker和topic列表数据
[zk: 127.0.0.1:2181(CONNECTED) 5] ls /kafka/brokers
[ids, seqid, topics]

# 表示当前集群有3个节点
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /kafka/brokers/ids
[1, 2, 3]
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /kafka/brokers/seqid
[]
# 表示集群当前的topic信息
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /kafka/brokers/topics
[__consumer_offsets, appjsonlog_heartbeat, nginx_log,ingress_log ]

# broker详情
[zk: 127.0.0.1:2181(CONNECTED) 10] get  /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":9999,"host":"10.0.0.1","timestamp":"1588925944886","port":9092,"version":4}

# topic详情
[zk: 127.0.0.1:2181(CONNECTED) 11] get  /kafka/brokers/topics/__consumer_offsets
{"version":2,"partitions":{"1":[2,1,3],"0":[3,2,1],"2":[1,2,3],"adding_replicas":{},"removing_replicas":{}}


[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 2]

# 查看topic某个分区的状态详情
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/1
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/1/state
[]
[zk: localhost:2181(CONNECTED) 8] get  /brokers/topics/__consumer_offsets/partitions/1/state
{"controller_epoch":3,"leader":1,"version":1,"leader_epoch":12,"isr":[3,1]}

cluster 目录结构

[zk: 127.0.0.1:2181(CONNECTED) 5] get   /kafka/cluster/id
{"version":"1","id":"5C-JZf4vRdqKzlca7Lv7pA"}
cZxid = 0x100000018
ctime = Fri Mar 30 22:21:07 CST 2018
mZxid = 0x100000018
mtime = Fri Mar 30 22:21:07 CST 2018
pZxid = 0x100000018
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0

config 目录结构

[zk: 127.0.0.1:2181(CONNECTED) 12] ls /kafka/config
[brokers, changes, clients, topics, users]

# 需要注意的是,config目录用来存放各种实体的配置,用于使用kafka相关工具对实体进行的配置变更存储
# 因此,一般在kafka集群运行后,如不设置相关动态参数,该目录下的配置一般为空
[zk: 127.0.0.1:2181(CONNECTED) 19] ls /kafka/config/brokers
[]
[zk: 127.0.0.1:2181(CONNECTED) 20] ls /kafka/config/changes
[]
[zk: 127.0.0.1:2181(CONNECTED) 21] ls /kafka/config/clients
[]
[zk: 127.0.0.1:2181(CONNECTED) 22] ls /kafka/config/topics
[__consumer_offsets]
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka/config/users
[]

# 可以看到,存储消费者偏移量的topic在配置中
# 是因为该topic有一些额外的topic级别参数
# 如果我们对topic参数有过动态变更,将会在这里存储
[zk: 127.0.0.1:2181(CONNECTED) 24] get /kafka/config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","compression.type":"producer","cleanup.policy":"compact"}}

controller 目录结构

# 可以看到当前broker-1为集群的controller节点
[zk: 127.0.0.1:2181(CONNECTED) 30] get  /kafka/controller
{"version":1,"brokerid":1,"timestamp":"1588833869354"}

controller_epoch 目录结构

# 可以看到controller的年龄是2,说明controller经历过2次变更了
[zk: 127.0.0.1:2181(CONNECTED) 32] get  /kafka/controller_epoch
2

四、Kafka原理文章推荐

震惊了!原来这才是kafka!

五、滴滴开源的kafka-manager编译及部署

官方GitHub
官方用户手册

准备

  • 安装JDK1.8+的linux服务器
  • ​Mysql(Drds)
  • ​Mysql建库kafka-manager,执行初始化脚本 create_mysql_table.sql

编译

  • 下载源码
  • 修改默认管理数据库驱动
#进入下载的源码目录/kafka-manager-master/dao修改pom.xml文件,新增mysql8的依赖,如果需要mysql5也是一样的方法。

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.13</version>
</dependency>

#修改之后到/kafka-manager-master目录下进入cmd命令,执行mvn install,等待一会即可编译成功。
#然后进入kafka-manager-master\web\src\main\resources下将application.yml 以及kafka-manager-master\web\target下的编译成功后的kafka-manager-web-1.0.0-SNAPSHOT.jar上传到服务器。
  • 修改application.yml配置文件
server:
  port: 8080
  tomcat:
    accept-count: 100
    max-connections: 1000
    max-threads: 20
    min-spare-threads: 20
 
spring:
  application:
    name: kafkamanager
  datasource:
    kafka-manager:
      jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver #MYSQL8 需要在dao文件夹内修改pom.xml里的依赖为mysql8然后重新打包
  main:
    allow-bean-definition-overriding: true
 
  profiles:
    active: dev
 
logging:
  config: classpath:logback-spring.xml
 
# kafka监控
kafka-monitor:
  enabled: true
  notify-kafka:
    cluster-id: 95
    topic-name: kmo_monitor
  • 启动
nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &

访问 http://localhost:8080,输入帐号及密码进行登录,默认账号是admin/admin(在管理数据库account表内也可以查到)

相关文章

网友评论

      本文标题:Kafka集群搭建

      本文链接:https://www.haomeiwen.com/subject/voqmcktx.html