5. Kafka

作者: 随便写写咯 | 来源:发表于2021-06-29 20:18 被阅读0次

3 Kafka

Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家公司中投入生产。

3.1 常用消息队列对比

kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。

RabbitMQ主要用于业务间数据传递, kafka主要用于大数据场合

图片.png

3.2 kafka优势

kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。
支持通过 Kafka 服务器分区消息。
支持 Hadoop 并行数据加载。

图片.png

O(1)就是最低的时空复杂度了,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的 O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标

3.3 kafka角色

Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。

Topic :每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保存于一个或多个 broker 上, 但用户只需指定消息的 topic 即可生产或消费数据而不必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消费者需要订阅相应的 topic 才能消费 topic 中的消息。

Partition :是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时可指定 parition 数量,每个 partition 对应于一个文件夹,该文件夹下存储该partition 的数据和索引文件,为了实现数据的高可用,比如将一个topic的数据放到3个分区0,1,2, 分区 0 的数据分散到不同的 kafka 节点,每一个分区都有一个 broker 作为 leader 和一个 broker作为 Follower。

kafka的分区类似Redis Cluster, 把数据放到不同的服务器上, 加快读取性能

图片.png

分区的优势(分区因子: 就是保留几个数据的副本):
一:实现存储空间的横向扩容,即将多个 kafka 服务器的空间结合利用
二:提升性能,多服务器读写
三:实现高可用,分区 leader 分布在不同的 kafka 服务器,比如分区 0 的leader 为服务器 A,则服务器 B 和服务器 C 为 A 的 follower,而分区 1 的 leader为服务器 B,则服务器 A 和 C 为服务器 B 的 follower,而分区 2 的 leader 为 C,则服务器 A 和 B 为 C 的 follower。

当数据备份多时, 就会消耗磁盘空间, 但是数据安全性非常高. 一般分区因子为2即可, 本身数据有一份, 再有一份备份

Producer:负责发布消息到 Kafka broker。

Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的 group),使用consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。一般一个消息只会被一个消费者消费一次.

3.4 kafka部署

部署三台服务器的高可用 kafka 环境。

部署环境:

Server1:10.0.0.229-kafka-1
Server2:10.0.0.239-kafka-2
Server3:10.0.0.249-kafka-3

版本: kafka_2.13-2.5.1.tgz, 二进制包, Scala:2.13 kafka:2.5.1

下载地址

安装步骤:

  1. Scala语言依赖java, 因此先安装jdk
apt -y install openjdk-8-jdk

java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

  1. 解压二进制包到/apps目录下
mkdir /apps
tar xvf kafka_2.13-2.5.1.tgz -C /apps
  1. 修改配置文件
vim /apps/kafka_2.13-2.5.1/config/server.properties
配置文件说明

broker.id=0 # 默认为0, 每个broker, 也就是kafka服务器在整个集群中的唯一标识, 为正整数
listeners=PLAINTEXT://:9092 # 监听的ip和port, 需要是本地的ip, 不能是0.0.0.0
log.dirs=/tmp/kafka-logs # kafka数据保存的目录, 所有的消息都会存储在该目录中
num.partitions=1 # 设置创建新的 topic 默认分区数量, 一般按照服务器数量, 设置分区数量, 为每个topic创建几个partition. 把数据放在不同的kafka保存, 配合分区因子, 生成指定数量的副本做备份
num.network.threads=3 # 开启的网络线程
num.io.threads=8 # 开启的io线程
log.retention.hours=168 # 设置 kafka 中消息保留时间,默认为 168 小时即 7 天
zookeeper.connect=localhost:2181 # kafka依赖zookeeper实现高可用, 服务启动时, 会把自身信息注册到zookeeper, 因此, kafka集群依赖于zookeeper
offsets.topic.replication.factor=1 # topic的副本因子, topic会按照partition数量存到不同的partition, 不同的partition再根据分区因子, 存到不同的kakfa broker
kafka-1 10.0.0.229

broker.id=229
listeners=PLAINTEXT://10.0.0.229:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 # java程序会尝试连接第一个地址, 如果不可用, 会报错, 但是会继续连接下一个地址
kafka-2 10.0.0.239

broker.id=239
listeners=PLAINTEXT://10.0.0.239:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
kafka-3 10.0.0.249

broker.id=249
listeners=PLAINTEXT://10.0.0.249:9092
log.dirs=/apps/kafka_2.13-2.5.1/data
num.partitions=3
num.network.threads=3
num.io.threads=8
log.retention.hours=168
zookeeper.connect=10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
  1. 创建数据保存目录
mkdir /apps/kafka_2.13-2.5.1/data
  1. 部署zookeeper集群

集群环境

Zoo1 - 10.0.0.209
Zoo2 - 10.0.0.199
Zoo3 - 10.0.0.189

部署流程参考Zookeeper一文

  1. 验证Zookeeper集群状态和角色
root@Zoo1:/apps# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
root@Zoo2:/apps/zookeeper/conf# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
root@Zoo3:/apps/zookeeper/conf# /apps/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
  1. 各节点启动kafka
  • 节点1
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties

[2021-06-29 18:55:08,480] INFO [KafkaServer id=229] started (kafka.server.KafkaServer)
  • 节点2
root@kafka-2:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties

[2021-06-29 18:49:57,381] INFO [KafkaServer id=239] started (kafka.server.KafkaServer)
  • 节点3
root@kafka-3:~# /apps/kafka_2.13-2.5.1/bin/kafka-server-start.sh -daemon /apps/kafka_2.13-2.5.1/config/server.properties

[2021-06-29 18:49:59,196] INFO [KafkaServer id=249] started (kafka.server.KafkaServer)
  • 如果启动失败, 可以查看kafka日志
cat /apps/kafka_2.13-2.5.1/logs/server.log
  • kafka监听9092端口, 应用程序需要连接到9092
  1. 通过zookeeper客户端, 查看kafka注册信息
图片.png 图片.png 图片.png

1、Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信息都会写入 Zookeeper 的 Node 节点中;
2、Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0.9 以后 offset存储在本地。
3、Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的;

  • 到此, kafka和zookeeper部署完成, 只需要提供kafka和zookeeper服务器的地址给开发人员, 即可通过应用程序连接

3.5 读写数据

官网介绍: Apache Kafka

3.5.1 创建topic

创建名为 kafka-test,partitions(分区)为 3,replication(每个分区的副本数/每个
分区的分区因子)为 3 的 topic(主题), 在任意 kafka 服务器操作即可

在kafka-1创建topic

root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --create --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --partitions 3 --replication-factor 3 --topic kafka-test
Created topic kafka-test.

3.5.2 验证topic

状态说明:kafka-test 有三个分区分别为0、1、2,分区0的leader是229(broker.id),
分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。

在任意 kafka 服务器操作即可

root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --describe --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --topic kafka-test
Topic: kafka-test   PartitionCount: 3   ReplicationFactor: 3    Configs: 
    Topic: kafka-test   Partition: 0    Leader: 229 Replicas: 229,239,249   Isr: 229,239,249
    Topic: kafka-test   Partition: 1    Leader: 239 Replicas: 239,249,229   Isr: 239,249,229
    Topic: kafka-test   Partition: 2    Leader: 249 Replicas: 249,229,239   Isr: 249,229,239

3.5.3 获取所有topic

root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --list --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181
kafka-test

图片.png

3.5.4 测试消息发送

配置kafka-tool工具

图片.png 图片.png
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-console-producer.sh --broker-list 10.0.0.229:9092,10.0.0.239:9092,10.0.0.249:9092 --topic kafka-test
>hello1
>hello2
>hello3


3.5.5 测试获取消息

图片.png 图片.png
root@kafka-2:~# /apps/kafka_2.13-2.5.1/bin/kafka-console-consumer.sh --topic kafka-test --bootstrap-server 10.0.0.229:9092,10.0.0.239:9092,10.0.0.249:9092 --from-beginning
hello1
hello2
hello3

3.5.6 topic删除

图片.png
root@kafka-1:~# /apps/kafka_2.13-2.5.1/bin/kafka-topics.sh --delete --zookeeper 10.0.0.209:2181,10.0.0.199:2181,10.0.0.189:2181 --topic kafka-test
Topic kafka-test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

4 kafka总结

  1. 分区:

基于分区, 把数据拆成多分, 存到不同的服务器topic, 提高并行写入性能和读取性能, 因为磁盘和网络的IO是固定的. 分区写到不同的服务器会提高写速度. 三个服务器, 每个服务器同时写300M数据, 肯定比一个服务器, 写300M数据效率高.

每个分区都可以设置不同的分区因子, 作为备份, 来实现数据的高可用, 一个分区作为leader, 另外的作为follower, leader的选举是结合Zookeeper实现

  1. 数据顺序写入, 先进来的数据先写, 尽可能利用磁盘IO, 即使使用机械磁盘, 速度也很快

3.MMAP: 针对写操作的内存映射技术, 尤其是针对机械磁盘. kafka为了优化写入性能使用MMAP, 数据先写入内存, 并不是直接写入磁盘, 内核收到来自客户端的请求, 直接在内存映射一个区域, 先把数据先缓存到内存, 然后通知应用程序, 数据写入成功, 之后再往磁盘写入, 即使是机械磁盘, 每秒也可以写几百M的数据, 因此, 即使是几个G的内存数据, 几秒后也会完成磁盘写入

  1. 对于读操作的优化, kakfa除了顺序读, 还会支持零拷贝, 正常情况, 数据从磁盘读取后先放到内核的内存缓冲区, 然后拷贝到进程的内存缓冲区. 而零拷贝会在数据拷贝到内核的内存缓冲区后, 返回客户端数据准备好, 即可完成读取, 无需再拷贝到进程的内存缓存区域

相关文章

  • 5. Kafka

    3 Kafka Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家公司...

  • docker安装kafka

    1.拉取zookeeper 2.拉取kafka 3.查看镜像 4.启动zookeeper 5.启动kafka 6....

  • kafka的简易搭建

    ubuntu16.04 1.java环境安装 2.下载kafka 3.解压 4.修改kafka配置文件 5.运行

  • (九)kafka streaming 整合(有receiver)

    1.启动zookeeper 2.启动kafka服务 3.创建kafka的topic 4.启动生产者 5.启动消费者...

  • 5. ActiveMQ平滑迁移到kafka

    直入主题,不讨论为什么迁移,直接谈迁移方案。 既然是从AMQ(AtiveMQ的简称)迁移到kafka,那么迁移过程...

  • Kafka入门系列—5. Topic的分区

    深入分区 Topic至少有一个分区、可以有多个分区。通过创建时的参数--partitions来指定分区数。 消息被...

  • Kafka详细的设计和生态系统

    Kafka详细的设计和生态系统 Kafka生态系统 - Kafka核心,Kafka流,Kafka连接,Kafka ...

  • kafka全面认知

    什么是Kafka[#---kafka] Kafka的应用场景[#kafka-----] Kafka的架构[#kaf...

  • Kafka & NSQ

    Kafka & NSQ Kafka kafka struct kafka & consumer group 2ka...

  • kafka学习系列

    Kafka学习总结(一)——Kafka简介 Kafka学习总结(二)——Kafka设计原理 Kafka学习总结(三...

网友评论

      本文标题:5. Kafka

      本文链接:https://www.haomeiwen.com/subject/qsqfultx.html