3 Kafka
Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家公司中投入生产。
3.1 常用消息队列对比
kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。
RabbitMQ主要用于业务间数据传递, kafka主要用于大数据场合
![](https://img.haomeiwen.com/i18380359/625129de60a1f51e.png)
3.2 kafka优势
kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。
支持通过 Kafka 服务器分区消息。
支持 Hadoop 并行数据加载。
![](https://img.haomeiwen.com/i18380359/49d4a2f55185a138.png)
O(1)就是最低的时空复杂度了,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的 O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标
3.3 kafka角色
Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
Topic :每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保存于一个或多个 broker 上, 但用户只需指定消息的 topic 即可生产或消费数据而不必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消费者需要订阅相应的 topic 才能消费 topic 中的消息。
Partition :是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时可指定 parition 数量,每个 partition 对应于一个文件夹,该文件夹下存储该partition 的数据和索引文件,为了实现数据的高可用,比如将一个topic的数据放到3个分区0,1,2, 分区 0 的数据分散到不同的 kafka 节点,每一个分区都有一个 broker 作为 leader 和一个 broker作为 Follower。
kafka的分区类似Redis Cluster, 把数据放到不同的服务器上, 加快读取性能
![](https://img.haomeiwen.com/i18380359/3d7408efe433eb05.png)
分区的优势(分区因子: 就是保留几个数据的副本):
一:实现存储空间的横向扩容,即将多个 kafka 服务器的空间结合利用
二:提升性能,多服务器读写
三:实现高可用,分区 leader 分布在不同的 kafka 服务器,比如分区 0 的leader 为服务器 A,则服务器 B 和服务器 C 为 A 的 follower,而分区 1 的 leader为服务器 B,则服务器 A 和 C 为服务器 B 的 follower,而分区 2 的 leader 为 C,则服务器 A 和 B 为 C 的 follower。
当数据备份多时, 就会消耗磁盘空间, 但是数据安全性非常高. 一般分区因子为2即可, 本身数据有一份, 再有一份备份
Producer:负责发布消息到 Kafka broker。
Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的 group),使用consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。一般一个消息只会被一个消费者消费一次.
3.4 kafka部署
部署三台服务器的高可用 kafka 环境。
部署环境:
Server1:10.0.0.229-kafka-1
Server2:10.0.0.239-kafka-2
Server3:10.0.0.249-kafka-3
版本: kafka_2.13-2.5.1.tgz, 二进制包, Scala:2.13 kafka:2.5.1
安装步骤:
- Scala语言依赖java, 因此先安装jdk
apt -y install openjdk-8-jdk
java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)
- 解压二进制包到/apps目录下
mkdir /apps
tar xvf kafka_2.13-2.5.1.tgz -C /apps
- 修改配置文件
vim /apps/kafka_2.13-2.5.1/config/server.properties
配置文件说明
broker.id=0 # 默认为0, 每个broker, 也就是kafka服务器在整个集群中的唯一标识, 为正整数
listeners=PLAINTEXT://:9092 # 监听的ip和port, 需要是本地的ip, 不能是0.0.0.0
log.dirs=/tmp/kafka-logs # kafka数据保存的目录, 所有的消息都会存储在该目录中
num.partitions=1 # 设置创建新的 topic 默认分区数量, 一般按照服务器数量, 设置分区数量, 为每个topic创建几个partition. 把数据放在不同的kafka保存, 配合分区因子, 生成指定数量的副本做备份
num.network.threads=3 # 开启的网络线程
num.io.threads=8 # 开启的io线程
log.retention.hours=168 # 设置 kafka 中消息保留时间,默认为 168 小时即 7 天
zookeeper.connect=localhost:2181 # kafka依赖zookeeper实现高可用, 服务启动时, 会把自身信息注册到zookeeper, 因此, kafka集群依赖于zookeeper
offsets.topic.replication.factor=1 # topic的副本因子, topic会按照partition数量存到不同的partition, 不同的partition再根据分区因子, 存到不同的kakfa broker
kafka-1 10.0.0.229
broker.id=229
listeners=PLAINTEXT://10.0.0.229:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 # java程序会尝试连接第一个地址, 如果不可用, 会报错, 但是会继续连接下一个地址
kafka-2 10.0.0.239
broker.id=239
listeners=PLAINTEXT://10.0.0.239:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
kafka-3 10.0.0.249
broker.id=249
listeners=PLAINTEXT://10.0.0.249:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
- 创建数据保存目录
mkdir /apps/kafka_2.13-2.5.1/data
- 部署zookeeper集群
集群环境
Zoo1 - 10.0.0.209
Zoo2 - 10.0.0.199
Zoo3 - 10.0.0.189
部署流程参考Zookeeper一文
- 验证Zookeeper集群状态和角色
root@Zoo1:/apps# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
root@Zoo2:/apps/zookeeper/conf# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
root@Zoo3:/apps/zookeeper/conf# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
- 各节点启动kafka
- 节点1
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties
[2021-06-29 18:55:08,480] INFO [KafkaServer id=229] started (kafka.server.KafkaServer)
- 节点2
root@kafka-2:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties
[2021-06-29 18:49:57,381] INFO [KafkaServer id=239] started (kafka.server.KafkaServer)
- 节点3
root@kafka-3:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties
[2021-06-29 18:49:59,196] INFO [KafkaServer id=249] started (kafka.server.KafkaServer)
- 如果启动失败, 可以查看kafka日志
cat /apps/kafka_2.13-2.5.1/logs/server.log
- kafka监听9092端口, 应用程序需要连接到9092
- 通过zookeeper客户端, 查看kafka注册信息
![](https://img.haomeiwen.com/i18380359/aafb12c717b0a649.png)
![](https://img.haomeiwen.com/i18380359/dae97e59ceef885e.png)
![](https://img.haomeiwen.com/i18380359/4bc0f2d33c66653f.png)
1、Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信息都会写入 Zookeeper 的 Node 节点中;
2、Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0.9 以后 offset存储在本地。
3、Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的;
- 到此, kafka和zookeeper部署完成, 只需要提供kafka和zookeeper服务器的地址给开发人员, 即可通过应用程序连接
3.5 读写数据
官网介绍: Apache Kafka
3.5.1 创建topic
创建名为 kafka-test,partitions(分区)为 3,replication(每个分区的副本数/每个
分区的分区因子)为 3 的 topic(主题), 在任意 kafka 服务器操作即可
在kafka-1创建topic
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --create --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --partitions 3 --replication-factor 3 --topic kafka-test
Created topic kafka-test.
3.5.2 验证topic
状态说明:kafka-test 有三个分区分别为0、1、2,分区0的leader是229(broker.id),
分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。
在任意 kafka 服务器操作即可
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --describe --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --topic kafka-test
Topic: kafka-test PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: kafka-test Partition: 0 Leader: 229 Replicas: 229,239,249 Isr: 229,239,249
Topic: kafka-test Partition: 1 Leader: 239 Replicas: 239,249,229 Isr: 239,249,229
Topic: kafka-test Partition: 2 Leader: 249 Replicas: 249,229,239 Isr: 249,229,239
3.5.3 获取所有topic
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --list --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
kafka-test
![](https://img.haomeiwen.com/i18380359/8e6b765d414b3c85.png)
3.5.4 测试消息发送
配置kafka-tool工具
![](https://img.haomeiwen.com/i18380359/49be7f66292a0536.png)
![](https://img.haomeiwen.com/i18380359/753ada89d74d913f.png)
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-console-producer.sh --broker-list 10.0.0.229:9092,10.0.0.239:9092,10.0.0.249:9092 --topic kafka-test
>hello1
>hello2
>hello3
3.5.5 测试获取消息
![](https://img.haomeiwen.com/i18380359/8037f0536dc6aca4.png)
![](https://img.haomeiwen.com/i18380359/fd50eb948d15cfff.png)
root@kafka-2:~# /apps/kafka_2.13-2.5.1/bin/kafka-console-consumer.sh --topic kafka-test --bootstrap-server 10.0.0.229:9092,10.0.0.239:9092,10.0.0.249:9092 --from-beginning
hello1
hello2
hello3
3.5.6 topic删除
![](https://img.haomeiwen.com/i18380359/c55920485aeacff3.png)
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --delete --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --topic kafka-test
Topic kafka-test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
4 kafka总结
- 分区:
基于分区, 把数据拆成多分, 存到不同的服务器topic, 提高并行写入性能和读取性能, 因为磁盘和网络的IO是固定的. 分区写到不同的服务器会提高写速度. 三个服务器, 每个服务器同时写300M数据, 肯定比一个服务器, 写300M数据效率高.
每个分区都可以设置不同的分区因子, 作为备份, 来实现数据的高可用, 一个分区作为leader, 另外的作为follower, leader的选举是结合Zookeeper实现
- 数据顺序写入, 先进来的数据先写, 尽可能利用磁盘IO, 即使使用机械磁盘, 速度也很快
3.MMAP: 针对写操作的内存映射技术, 尤其是针对机械磁盘. kafka为了优化写入性能使用MMAP, 数据先写入内存, 并不是直接写入磁盘, 内核收到来自客户端的请求, 直接在内存映射一个区域, 先把数据先缓存到内存, 然后通知应用程序, 数据写入成功, 之后再往磁盘写入, 即使是机械磁盘, 每秒也可以写几百M的数据, 因此, 即使是几个G的内存数据, 几秒后也会完成磁盘写入
- 对于读操作的优化, kakfa除了顺序读, 还会支持零拷贝, 正常情况, 数据从磁盘读取后先放到内核的内存缓冲区, 然后拷贝到进程的内存缓冲区. 而零拷贝会在数据拷贝到内核的内存缓冲区后, 返回客户端数据准备好, 即可完成读取, 无需再拷贝到进程的内存缓存区域
网友评论