主要是想学习一个高性能的,高可用的分布式消息系统是如何设计和考虑的?
先写提纲,后面自问自答
生产者的高性能如何保障?
- 如何把海量的数据快速发送完毕
- 如何确保数据被接收
- 发送过程中,网络异常和抖动怎么办
- 如果重复发送了数据怎么处理?
设计一个client 要考虑的问题:
-
异常处理的能力。
由于服务端是一个和黑子,很可能存在诸多的问题,比如网络性能,网络异常,服务器宕机等,
需要在诸多的不确定下,实现良好的异常处理机制。
1)如何确保,消息一定被接收到。应答ack是一个常用的方法。对于发送出去的消息,kaf 放在一个inflightRequest队列里面。
如果收到客户端的ack,就视为真正的发送成功,否则则启动重试策略,比如一段时间没有收到ack就可以发起重试。
当然是否需要ack,也是可以配置的。根据具体的需求,kaf可以设置几种ack的强度。0,不需要。1,需要leader回复。-1,需要 leader +ISR 回复。
这在存储的高可用性的时候会有详细的涉及。 -
高效的生产能力。
比如对接用户行为,可能行为流数据源源不断的需要写入到kafka,这个时候对client的性能上是有要求的。
从设计高性能的client端来看, client 的性能消耗可能存在的地方:
1) 消息预处理序列化成字节码
2) 构建网络连接,发送字节码。
涉及到网络发送的性能优化, 一个是建立长连接,避免不断建立连接消耗时间和系统资源;
使用batch的模式,一次多发;使用零拷贝的技术,尽量不要多次从 用户态到内核太的拷贝数据。
使用零拷贝,可以收到数据socket-发送数据socket直接拷贝。
kafka client,基本上用到了上面提到的tip。
kafka是一个异步发送的模型, 需要发送的数据,先存在本地的一个buffPool里面,
然后等待一段时间,或者达到一定数量,就一起发送,类似于大巴车待客出发的模式。
调用client.close()会强制刷新
-
自动处理服务端变化的能力。
服务端1开始10个服务器,后来扩展到20个服务器,client需要有自动rebalance的能力。
kafka 提供了几种模式,可以指定分区,也可以不指定, 不指定的时候,需要获取集群的metadata,
然后根据meta的信息来决定待发送的 leader是否 alive等。
上面说的是扩容的情况,其实服务端的变化,还包含宕机和恢复。 kafka都可以通过metadata接口来感知,
并作出相应的变化。 -
如果product 自己宕机了怎么办?
现有文档对这个问题涉及的比较少,只是在 exactly once 有相关的论述。
如果product 发送A 的过程中,自己宕机了,那么恢复之后如何知道,这条A 是否需要从新发送呢?
可以记录A消息是否收到ACK,如果收到不重发,如果不收到重发。
但是有可能ACK是中途丢失了了,重复发送,kafka 就可能收到两份A重复的。 这就比较麻烦了,
这个时候需要给消息一个全局的ID,kafka判断A是否已经重复。
所以说 client 无法做 exactly once的,只能靠kafka 去判断,两者还需配合。
接收者高性能+ 高可用如何保障
- 是如何存储数据的
- 是如何做扩展的
- 数据一致性是如何保证的
- 分片和副本是如何做到高可用的
- 读/写是如何分别做到高性能的
- 历史数据会永远保存吗?
- 先看存储的具体骚操作
从写入1台,到写入 3台server。
分布式系统的三宝,分片,副本,一致性。
比如有个topA, 那么他会存在 3台服务器上,都是以文件的方式存储。
A-1, 2: A-2, 3:A-3;
分片的方法就是大规模横向扩展, 这样就能更具需求加机器,每个机器都分担一点压力。
我们知道,在只有一台存储服务器的时候,我们需要给他做备份,在分布式里面也一样。
每一个分片可以类比与一台电脑,我们需要给他做一个备份,专业术语就是副本。
副本高效的同步一直是分布式系统的一大难题,而 leader-follower 模型是相对简单的,在leader 宕机的时候采用一半选举机制来选出新的leader。
首先,我们要明确leader的作用,就是一切read/write的接口,follow只负责做备份,不对外提供服务。
kaf的存储方式非常值得借鉴,如何实现高效的文件读写系统。
比如,要写入a,b,c,d,e,f,g 7条数据到1个分区我们可以看看是如何实现的。
原理:要想读的快,必须要靠索引。
0.log, 文件存放 a,b
3.log, 存放 c,d
5.log, 存放 e,f
7.log,存放 g。
现在需要消费 f,offset = 6 的数据。
总不可能对所有文件进行扫描,这个时候就需要索引文件了。
0.index,3. index. 5.index, 7.index
index 的文件名为 offset 的偏移量。
比如offset = 6,的数据一定是在 5.index 里面,这里存了5-7的数据。
锁定index文件,是一个二分查找。
那么 5.index 的内容又是什么呢?
5-> 第一行
6-> 第二行
index 文件肯定是倒排索引了,这个索引是 log 文件的行数。
通常 一个 log文件大概为1G, 那么对应的 index其实也比较多,为了节省空间以使得index
可以放入内存。 kaf使用了稀疏索引。 并不是为每一行都做倒排,读取的时候可能需要小范围的一个遍历。
写的时候,由于一个分区都是有效的,所以会往最后一个文件进行顺序追加,速度是非常快的。
2)副本同步机制ISR
在前面提到,kaf在收到生产者的消息之后,有三种处理方式
1)leader 写入成功之后,就发送ack,这种情况下吞吐最大
2)leader+follower全部写入成功再发送ack,影响吞吐量,并且有些follow的宕机和阻塞可能造成大面积的等待。
3)kaf的 ISR机制,维护一个ISR列表,整个列表中的节点写入成功就返回ack。
ISR机制是kaf的一个特色。kaf的 leader+follower模式,并没有使用 一半投票选举法。 选举方法的劣势是,如果要容忍n台机器的失败,那么必须有2n+1台机器做副本,这个开销是比较大的。
ISR的思想是什么,就是把整个副本中大家比较一致的维护起来,这种一致性代表他们的数据层面比较接近,当有leader挂了之后,就从这个列表里面拿一个作为新的leader,而无需选举。
可以看到,这种情况下,容忍n个节点坏掉,只需要n+1个副本。
那么ISR队列是怎么维护的呢,以三个副本为例进行解释。
一开始,初始状态(a,b,c) 大家都是0,
来了一条消息,写入到 a,这个时候,b,c 作为follower需要一种机制去pull数据。
kaf里面是一个连接去pull leader的数据,为了性能考虑,做一个半长连接,leader有数据会推送过去。
正常情况下,follower都是能同步到数据。如果例外来了,比如宕机重启,网络繁忙,可能拉取不及时,造成了一定时间没有与leader同步。 这个时间是可以设置的,最新的版本kaf就是根据多久没拉数据来决定是否删除 ISR。
最开始的版本,还有一个变量控制,follower与 leader相差多少个消息,后来这个变量取消了。因为生产者可以发送batch,用这个变量不太好度量。
ISR总的来说是处理follower中的例外的,有例外的就踢出,以保证大家基本在一个频道上,这么做的好处,当有leader废掉之后,可以直接从 ISR拿一个作为leader。
假设所有的节点都是健康的,那么写入的时候,实际上是所有follower都需要写入之后才会返回ack。
那么在leaer 与follower同步的这段时间内,有消费者来消费数据,会是个怎么处理的呢?
例如,a=1,2,3 条数,b=1,2,c=1, 各个节点 维护一个坐标叫 highwater,这个值就是提交过的消息的下一个值。
也就是hw =2; 尽管 a的long end =4(下一个值); 消费的数据要保证是大家都有的。 在这个例子里面,如果能消费2,这个2 还是没有提交的。假设 a,b挂了,生产者可会重发,等一切正常之后,消费者可能还是会重复消费2 。
个人认为hw和 long end 是处于这个考虑。
(https://www.cnblogs.com/huxi2b/p/7453543.html)
-
如何做扩展的?
broker节点可以随时加入和退出,那么必须有一个地方存储这些信息,以便集群知道整个情况。
中心化和p2p的模式都可以实现,kaf使用了中心化的模式,大量的配置信息存在了zookeeper这样一个
可以做分布式容错的配置中心。
比如,新增加了一个节点, 他会向zookeeper注册,加入到brokerList里面。
比如,丢失了一个broker, zookeeper会发现这个问题,涉及的后续操作包括是否需要选择新的
controller。controller 决定是否需要为一些丢失的分区选择新的leader。
如果是一些follower丢失宕机了,zookeeper还维护 ISR的列表,会决定ISR的增删改查。
另外kaf可以增加分区和副本数,这些配置信息都是需要zookeeper协调处理的。 -
历史数据会留多久。
kaf的数据都是存在磁盘上的,对于一个topic,可以被不同的消费者组消费,
比如新建一个消费者组,他实际上是可以消费所有保留的历史数据的。
这个历史数据默认是保留7天,可通过设置参数改变默认行为。
消费者高性能+高可用
- 消息太多,如何做到快速消费的
- 增加和减少机器,整个集群会发生什么变化
- 消费者会多次消费数据吗, 断开之后,再续上,消费数据会有什么变化。
- 消费过程中断网会怎么样
- 如何保证数据只消费一次
消费端要解决的问题:
1)单台机器消费要快,支持弹性的扩容,可以根据消息量动态的增加多台机器。
单台机器消费能力的提升其实和product差不多。
- 采用pull的方式拉取数据,可以做一个 long polling, 一定时间之后再timeout。
- 拉取的时候,尽量多拉取数据,比如达到一个pool的size。
- 另外存储端,我们前面说过他的存储,给定一个offset,他是通过索引快速定位到文件行,并通过零拷贝的方式发送数据到网络。
单机的提升总是有限的,那么比如通过扩容机制来处理。比如1个topic 有 5个 分区。 如果只有2个消费者,那么根据kaf的分配风格,两台机器(1,2,3)(4,5),一个分区的数据只能被一个机器消费。所以我们最多可以使用5台机器进行扩容消费。 添加机器的时候,会触发一个rebalance的事件。这个可以通过注册节点到zookeeper由它来协调。比如新加一台机器,消费对应关系(1,2)(3,4)(5)。可以看到这个在平衡的过程后,第二台和新加入的机器是需要把一些信息拿过来的。 比如,第二台机器需要 获取,原来分区-3 最后一个消费的offset。
同理可以推导移除消费者的整个过程。
2)如何做容错的
第一种问题就是消费机器,变慢了或者说是宕机了。 通常可以通过心跳机制来维护消费者的状态。kaf里面可以通过polling信号来作为心跳的一种,比如一定时间没有收到polling请求,就认为该机器不可达,就有做相应的列表移除和rebalance的操作。重连之后,尚未提交的ack 的消息还是可以再次拉取。
这里引出一个问题,就是怎么样的数据才算真的消费,依旧是ack机制,和product一样的概念。
比如来取到数据到客户端之后,就可以自动提交ack;如果这个时候宕机了,该消息未处理就丢失了。
也可以是程序手动提交ack,也就是真正在处理完业务逻辑之后,再发送给服务器端,告诉它某个 offset的消息已经处理完毕。 这个时候如果发送出去的 ack 丢了,那么下次还是可能会拉取到重复数据,如果是对 exactly once要求特别严格的程序,是需要自己做一些额外的工作保证幂等性的。
kaf里面有两种提交方式,一种是auto,大概5秒自动提交一次;一种是手动,一般选择手动提交的比较多,手动里面分布同步和异步,同步可以比较好的处理异常情况,而异步提交,很可能,一个条offset=3,成功,后续一个提交offset=2成功。 异常情况下,顺序性等比较难保证,可能会造成重复消费数据。
3) 其他一些定制化的tips
提交offset的时候,默认是提交polling 拿到的最新的一个消息偏移,当整个batch比较大的时候,我们可能希望消费一部分就尽可能的提交一部分,避免一个宕机前功尽弃。 consumer的api 提供了指定offset的提交方式。
polling的时候,也可以指定具体的offset的位置,可以决定从头开始,还是从具体的某个特定位置开始。
4)精确的一次消费
幂等性在kaf里面是一个很大的话题,后面单独讨论。
1.Producer如何来实现。
构建一个 唯一 id 。(pid,topic,partion, seq)
服务端,把这个信息存起来。
解决的问题: 单个分区发送的信息,有序无重复。
失败的例子-1)
如果这个分区全部挂了,pid写入到其他分区。
然后,这个分区又好了。 里面存在了两份数据。
造成了重复。
失败的例子-2)
producer挂了, 假设业务上保存了最后一数据(msg+ seq)
那么如果 知道自己不变的 pid, 也是可以恢复的。
但是应为是分布式,要如何保存这个pid 唯一不变呢?
如果机器挂了,换一条,必须要取个不一样的名字,如果一样,等恢复了怎么办?
所以这里行不通。
目前kafka,的单个分区,单个session 模式能保证 exactly once。
如果要多份区呢?
- 事务ID
通过上面的推理,我们必须要保障一个 全局的ID 来决定一个全局的 唯一ID
要跨分区,必须要有一个地方存储中心化的信息吧?
不然如果解决 失败的例子-1) 这种情况呢?
把(transId,prid,topic,seq) 作为一个id。就不用考虑分区了。
存在kafka的一个公共队列里面。
貌似加了 生成这边就没什么问题了。。。
但是更难的问题是, 如果保障 多个写,读,混合的操作,满足事务的语义
要么全部提交,要么全部abort。
这个就涉及到 commitOffset 和 readOffset的处理和协调问题。
看不太懂= TODO
(http://www.jasongj.com/kafka/transaction/)
设计上为什么是优秀的,有哪些亮点可以借鉴与分布式系统中。
- 分布式核心,分区,副本,一致性。
- 文件存储的高性能,顺序读写,分块索引
- master-slave 简单的容错机制
扩展的基础知识
- RPC 框架与 异步RPC实现原理
- push和poll模式区别,pull 模式 半长连接的效率提升
https://www.jianshu.com/p/6e90c2f2e463
核心,使用poll 模式,hold住一个长连接,有数据之后就返回。或者是等待超时就返回,尽可能的让每一次pull的消耗都是有价值的。 - 分布式的难题, exactly once。
网友评论