一、为什么需要消息系统
-
1、解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
-
2、冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
-
3、扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
-
4、灵活性 & 峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
-
5、可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
-
6、顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
-
7、缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
-
8、异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
二、消息队列的两种模式
- 1、点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
![](https://img.haomeiwen.com/i13587608/1d1b37d50877cc21.png)
- 2、发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消
息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
![](https://img.haomeiwen.com/i13587608/2960e19e0327527d.png)
三、Kafka 基础架构
3.1、Kafka拓扑结构
![](https://img.haomeiwen.com/i13587608/f713472727527437.png)
3.2、上图中kafka相关术语
-
1、Producer:消息生产者,发布消息到 kafka 集群的终端或服务。
-
2、Broker:代理,kafka集群包含的服务端(每个服务端称之为代理),负责处理消息读写请求及存储消息。
-
3、Topic:每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
-
4、Partition:partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
-
5、Consumer:从 kafka 集群中消费消息的终端或服务。
-
6、Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
-
7、Replica:partition 的副本,保障 partition 的高可用。
-
8、Leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
-
9、Follower:replica 中的一个角色,从 leader 中复制数据。
-
10、Controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller(leader election选举算法)。 -
11、Zookeeper:一个分布式应用程序协调服务,kafka 通过 zookeeper 来存储集群的 meta 信息。
3.3、zookeeper 节点
kafka 在 zookeeper 中的存储结构如下图所示:
![](https://img.haomeiwen.com/i13587608/a325d85d681705a5.png)
四、Kafka 工作流程与文件存储机制
4.1、工作流程
![](https://img.haomeiwen.com/i13587608/80b6d8fd2436bb15.png)
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。
Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
4.2、文件存储机制
![](https://img.haomeiwen.com/i13587608/1abdb1fcf6316b01.png)
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。
每个 segment 对应两个文件:“.index”文件和“.log”文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称 + 分区序号。
例如,first 这个 topic 有三个分区,则其对应的文件夹为 first0、first-1、first-2。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log 文件的结构示意图。
![](https://img.haomeiwen.com/i13587608/bbd22629618472fc.png)
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
四、Zookeeper 在 Kafka 中的作用
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所
有 topic 的分区副本分配和 leader 选举等工作。
Controller 的管理工作都是依赖于 Zookeeper 的。
以下为 partition 的 leader 选举过程:
![](https://img.haomeiwen.com/i13587608/5a88b74e5789ebbe.png)
五、kafka HA
5.1、replication
如Kafka拓扑结构图所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。
Kafka 分配 Replica 的算法如下:
- 1、将所有 broker(假设共 n 个 broker)和待分配的 partition 排序。
- 2、将第 i 个 partition 分配到第(i mod n)个 broker 上。
- 3、将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上。
5.2、leader failover
当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。
当所有 replica 都不工作时,有两种可行的方案:
- 1、等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
- 2、选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
kafka 0.8.* 使用第二种方式。
5.3、broker failover
kafka broker failover 序列图如下所示:
![](https://img.haomeiwen.com/i13587608/afaa2e39a52f04e3.png)
kafka 通过 Controller 来选举 leader,流程说明:
- 1、controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch。
- 2、controller 从 /brokers/ids 节点读取可用broker。
- 3、controller决定set_p,该集合包含宕机 broker 上的所有 partition。
- 4、对 set_p 中的每一个 partition
- 4.1、从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR。
- 4.2、决定新 leader(如4.3节所描述)。
- 4.3、将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点。
- 5、通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令。
5.4、controller failover
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:
- 1、读取并增加 Controller Epoch。
- 2、在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
- 3、在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
- 4、通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
- 5、若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
- 6、通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
- 7、初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
- 8、启动 replicaStateMachine 和 partitionStateMachine。
- 9、将 brokerState 状态设置为 RunningAsController。
- 10、将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
- 11、若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
- 12、若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
参考:
https://www.cnblogs.com/cyfonly/p/5954614.html
https://www.cnblogs.com/xifenglou/p/7251112.html
网友评论