整体架构
实现细节
队列创建
创建topic的时候指定分区(partition)的数量和副本的个数。
生产者发送
kafka对消息以topic的形式分类,消息发送的时候按key发送到kafka的某一个分区。
1.如果没有Key值则进行轮询发送。
2.如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区。
比如根据用户id进行hash,所以一个队列的消息可以根据分区的数量被发送到不同的机器处理,体现了分布式设计理念。
消息的存储
kafka每个分区的消息是以追加的形式,记录到日志文件尾部。每个分区下对应一个日志目录,日志目录下有多个日志分段LogSegment,LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。
每个segment文件名为上一个segment文件最后一条消息的offset,每个index文件采用稀疏索引,比如如果有0-100条消息,index第一条就是0,物理地址;第二条10,物理地址...这样。
所以查找消息时,可以用二分法找到分段文件,在到分段文件的index文件去二分法找到对应的存储地址,再顺序遍历就好。
消费者获取
kafka采用拉模式获取消息,可以分为多个消费组,每个消费组可以根据offest去获取自己需要的消息。
需要注意的是,一个分区的消息只会分配到一个消费节点。比如如果有4个分区,但是有5台消费者节点,会出现前4个消费者分别消费一个分区,第5个消费者空闲。
高吞吐原因
1.顺序写入。每次都是追加数据,不会删除。日志过多了可以按时间等去删除老的数据。
2.内存映射文件。所有的消息都变成一个的文件,通过mmap方式直接读写pageCache内核缓存,然后flush刷到磁盘,少了传统的还需要把文件从内核空间拷贝到用户空间的过程。
3.二进制消息格式。这样在生产者、消费者、节点之间可以自由传输,无需转换。
高可用原因
1.分布式消息队列,一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
2. HA 机制
那HA机制的leader怎么选取的呢?
副本是定时从leader拉取消息还是leader有消息了通知副本拉取呢?
首先选出符合要求的副本:
副本会有单独的线程去从leader上去拉去消息。当follower赶上leader的HW(已提交)进度时,就会保持或加入到ISR列表里。ISR列表存在zookeeper上。
这里的赶上进度的定义是可配置的,通过replica.lag.max.messages 落后的消息个数;replica.lag.time.max.ms 多长时间没有发送FetchQuest请求拉去leader数据。
其次在合格副本中选举:
如果leader不可用,就会从ISR表里选一个出来作为leader。这里的选举过程就是在ZK创建节点,原先leader在zk上是临时节点,Followers对此节点注册监听,当leader宕机时,ISR里的Followers都会尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
注意点:
实际的过程比这个复杂一些,因为实际情况可能有大量分片,而每个分片都有副本,当某个节点宕机时,可能会造成大量Watch事件被触发,导致Zookeeper负载会过重。zk是不适合大量写操作的。
参考:
kafka的leader选举过程
想一想
1.中途修改分区数量是否可行,会有什么影响
2.删除消息怎么实现的
3.broker和分区是什么关系
4.消息读是从磁盘读的吗
下次再继续开帖总结吧~
网友评论