Kafka之分布式消息队列
一、简介
二、生产集群搭建
准备
- 安装JDK的linux服务器三台(JDK版需要与Kafka版本对应好)
- Zookeeper集群
下载
准备好kafka安装包,官网下载地址
创建目录
cd /opt
mkdir kafka #创建安装目录
cd kafka
mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息
解压
tar -zxvf kafka_2.11-1.0.0.tgz -C /opt/kafka/
修改配置
cd /opt/kafka/kafka_2.11-1.0.0/config/
vim server.properties
broker.id=0 #集群内全局唯一,每台服务器设置不同的值
listeners=PLAINTEXT://192.168.xx.xx:9092 #这个IP地址也是与本机相关的,每台服务器上设置为自己的IP地址
log.dirs=/opt/kafka/kafkalogs #存放kafka消息
zookeeper.connect=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181 #zookeeper集群
拷贝安装目录
分发kafka目录到集群所有服务器,同时修改每个文件的broker.id和listeners
启动
#先启动zookeeper集群
bin/kafka-server-start.sh -daemon config/server.properties
测试
创建主题
bin/kafka-topics.sh --create --bootstrap-server 192.168.xx.xx:9092 --replication-factor 3 --partitions 3 --topic test01
查看主题列表
bin/kafka-topics.sh --list --bootstrap-server 192.168.xx.xx:9092
启动控制台生产者
bin/kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test01
启动控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test01 --from-beginning
在生产者控制台输入hello kafka
配置详解
broker.id=0
#每个 broker 都可以用一个唯一的非负整数 id 进行标识; 这个 id 可以作为 broker 的“名字”, 并且它的存在使得 broker 无须混淆 consumers
#就可以迁移到不同的 host/port 上。 你可以选择任意你喜欢的数字作为 id, 只要 id 是唯一的即可
listeners=PLAINTEXT://192.168.xx.xx:9092
#Kafka服务地址
#advertised.listeners=PLAINTEXT://your.host.name:9092
#Kafka注册到Zookeeper的地址,内网不用设置默认使用listeners,
#内外网环境
#listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
#listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://192.168.xx.xx:9093
#advertised.listeners=INTERNAL://192.168.xx.xx:9092,EXTERNAL://<公网ip>:<端口>
#inter.broker.listener.name=INTERNAL
num.network.threads=3
# broker 处理消息的最大线程数,一般情况下不需要去修改
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
socket.send.buffer.bytes=102400
#SO_SNDBUFF 缓存大小, server 进行 socket连接所用
socket.receive.buffer.bytes=102400
#SO_RCVBUFF 缓存大小, server 进行 socket连接时所用
socket.request.max.bytes=104857600
#server允许的最大请求尺寸; 这将避免server溢出, 它应该小于 Java heap size
log.dirs=/opt/kafka/kafkalogs
#kafka 存放数据的路径。这个路径并不是唯一的,可以是多个, 路径之间只需要使用逗号分隔即可;
#每当创建新 partition 时, 都会选择在包含最少 partitions 的路径下进行。
num.partitions=3
#如果创建 topic 时没有给出划分 partitions 个数, 这个数字将是 topic 下 partitions 数目的默认数值。
num.recovery.threads.per.data.dir=1
#每个数据目录用来日志恢复的线程数目
offsets.topic.replication.factor=3
#topic 的 offset 的备份份数。 建议设置更高的数字保证更高的可用性
transaction.state.log.replication.factor=3
#事务主题的复制因子(设置更高以确保可用性)。 内部主题创建将失败,直到群集大小满足此复制因素要求。
transaction.state.log.min.isr=3
#覆盖事务主题的min.insync.replicas配置。
#log.flush.interval.messages=10000
#此项配置指定时间间隔: 强制进行 fsync日志。 例如, 如果这个选项设置为 1, 那么每条消息之后都需要进行 fsync,
#如果设置为 5, 则每 5 条消息就需要进行一次fsync。 一般来说, 建议你不要设置这个值。
#log.flush.interval.ms=1000
#此项配置用来置顶强制进行 fsync 日志到磁盘的时间间隔; 例如, 如果设置为1000, 那么每 1000ms 就需要进行一次fsync。 一般不建议使用这选项
log.retention.hours=168
#log.retention.bytes=1073741824
#每个日志文件删除之前保存的时间。 默认数据保存时间对所有 topic 都一样。
#log.retention.hours和log.retention.bytes 都是用来设置删除日志文件的, 无论哪个属性已经溢出。
#这个属性设置可以在 topic 基本设置时进行覆盖。
log.segment.bytes=1073741824
#topic partition 的日志存放在某个目录下诸多文件中, 这些文件将 partition 的日志切分成一段一段的; 这个属性就是每个文件的最大尺寸;
#当尺寸达到这个数值时, 就会创建新文件。 此设置可以由每个 topic 基础设置时进行覆盖。
log.retention.check.interval.ms=300000
#检查日志分段文件的间隔时间, 以确定是否文件属性到达删除要求
zookeeper.connect=10.160.22.1:2181,10.160.22.2:2181,10.160.22.3:2181
#zookeeper集群
zookeeper.connection.timeout.ms=6000
#客户端等待和 zookeeper 建立连接的最大时间
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
#主要作用是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。
三、Zookeeper存储结构详解
kafka 元数据存储目录
admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)
brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息
cluster: 存储 kafka 集群信息
config: 存储 broker,client,topic,user 以及 changer 相关的配置信息
consumers: 存放消费者相关信息 (一般为空)
controller: 用于存放控制节点信息 (注意:该节点是一个临时节点,用于 controller 节点注册)
controller_epoch: 用于存放 controller 节点当前的年龄
isr_change_notification: 用于存储 isr 的变更通知 (临时节点,当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更)
latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围
log_dir_event_notification: 日志目录事件通知
admin 目录结构
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /kafka/admin
[delete_topics]
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka/admin/delete_topics
[]
[zk: 127.0.0.1:2181(CONNECTED) 4] get /kafka/admin/delete_topics
null
brokers 目录结构
# broker和topic列表数据
[zk: 127.0.0.1:2181(CONNECTED) 5] ls /kafka/brokers
[ids, seqid, topics]
# 表示当前集群有3个节点
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /kafka/brokers/ids
[1, 2, 3]
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /kafka/brokers/seqid
[]
# 表示集群当前的topic信息
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /kafka/brokers/topics
[__consumer_offsets, appjsonlog_heartbeat, nginx_log,ingress_log ]
# broker详情
[zk: 127.0.0.1:2181(CONNECTED) 10] get /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":9999,"host":"10.0.0.1","timestamp":"1588925944886","port":9092,"version":4}
# topic详情
[zk: 127.0.0.1:2181(CONNECTED) 11] get /kafka/brokers/topics/__consumer_offsets
{"version":2,"partitions":{"1":[2,1,3],"0":[3,2,1],"2":[1,2,3],"adding_replicas":{},"removing_replicas":{}}
[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 2]
# 查看topic某个分区的状态详情
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/1
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/1/state
[]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/__consumer_offsets/partitions/1/state
{"controller_epoch":3,"leader":1,"version":1,"leader_epoch":12,"isr":[3,1]}
cluster 目录结构
[zk: 127.0.0.1:2181(CONNECTED) 5] get /kafka/cluster/id
{"version":"1","id":"5C-JZf4vRdqKzlca7Lv7pA"}
cZxid = 0x100000018
ctime = Fri Mar 30 22:21:07 CST 2018
mZxid = 0x100000018
mtime = Fri Mar 30 22:21:07 CST 2018
pZxid = 0x100000018
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0
config 目录结构
[zk: 127.0.0.1:2181(CONNECTED) 12] ls /kafka/config
[brokers, changes, clients, topics, users]
# 需要注意的是,config目录用来存放各种实体的配置,用于使用kafka相关工具对实体进行的配置变更存储
# 因此,一般在kafka集群运行后,如不设置相关动态参数,该目录下的配置一般为空
[zk: 127.0.0.1:2181(CONNECTED) 19] ls /kafka/config/brokers
[]
[zk: 127.0.0.1:2181(CONNECTED) 20] ls /kafka/config/changes
[]
[zk: 127.0.0.1:2181(CONNECTED) 21] ls /kafka/config/clients
[]
[zk: 127.0.0.1:2181(CONNECTED) 22] ls /kafka/config/topics
[__consumer_offsets]
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka/config/users
[]
# 可以看到,存储消费者偏移量的topic在配置中
# 是因为该topic有一些额外的topic级别参数
# 如果我们对topic参数有过动态变更,将会在这里存储
[zk: 127.0.0.1:2181(CONNECTED) 24] get /kafka/config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","compression.type":"producer","cleanup.policy":"compact"}}
controller 目录结构
# 可以看到当前broker-1为集群的controller节点
[zk: 127.0.0.1:2181(CONNECTED) 30] get /kafka/controller
{"version":1,"brokerid":1,"timestamp":"1588833869354"}
controller_epoch 目录结构
# 可以看到controller的年龄是2,说明controller经历过2次变更了
[zk: 127.0.0.1:2181(CONNECTED) 32] get /kafka/controller_epoch
2
四、Kafka原理文章推荐
五、滴滴开源的kafka-manager编译及部署
准备
- 安装JDK1.8+的linux服务器
- Mysql(Drds)
- Mysql建库kafka-manager,执行初始化脚本 create_mysql_table.sql
编译
- 下载源码
- 修改默认管理数据库驱动
#进入下载的源码目录/kafka-manager-master/dao修改pom.xml文件,新增mysql8的依赖,如果需要mysql5也是一样的方法。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
#修改之后到/kafka-manager-master目录下进入cmd命令,执行mvn install,等待一会即可编译成功。
#然后进入kafka-manager-master\web\src\main\resources下将application.yml 以及kafka-manager-master\web\target下的编译成功后的kafka-manager-web-1.0.0-SNAPSHOT.jar上传到服务器。
- 修改application.yml配置文件
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver #MYSQL8 需要在dao文件夹内修改pom.xml里的依赖为mysql8然后重新打包
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor
- 启动
nohup java -jar kafka-manager-web-1.0.0-SNAPSHOT.jar --spring.config.location=./application.yml > /dev/null 2>&1 &
访问 http://localhost:8080,输入帐号及密码进行登录,默认账号是admin/admin(在管理数据库account表内也可以查到)
网友评论