美文网首页
kafka 学习笔记

kafka 学习笔记

作者: NazgulSun | 来源:发表于2020-08-10 18:52 被阅读0次

    主要是想学习一个高性能的,高可用的分布式消息系统是如何设计和考虑的?
    先写提纲,后面自问自答

    生产者的高性能如何保障?

    • 如何把海量的数据快速发送完毕
    • 如何确保数据被接收
    • 发送过程中,网络异常和抖动怎么办
    • 如果重复发送了数据怎么处理?
      设计一个client 要考虑的问题:
    1. 异常处理的能力。
      由于服务端是一个和黑子,很可能存在诸多的问题,比如网络性能,网络异常,服务器宕机等,
      需要在诸多的不确定下,实现良好的异常处理机制。
      1)如何确保,消息一定被接收到。应答ack是一个常用的方法。对于发送出去的消息,kaf 放在一个inflightRequest队列里面。
      如果收到客户端的ack,就视为真正的发送成功,否则则启动重试策略,比如一段时间没有收到ack就可以发起重试。
      当然是否需要ack,也是可以配置的。根据具体的需求,kaf可以设置几种ack的强度。0,不需要。1,需要leader回复。-1,需要 leader +ISR 回复。
      这在存储的高可用性的时候会有详细的涉及。

    2. 高效的生产能力。
      比如对接用户行为,可能行为流数据源源不断的需要写入到kafka,这个时候对client的性能上是有要求的。
      从设计高性能的client端来看, client 的性能消耗可能存在的地方:
      1) 消息预处理序列化成字节码
      2) 构建网络连接,发送字节码。
      涉及到网络发送的性能优化, 一个是建立长连接,避免不断建立连接消耗时间和系统资源;
      使用batch的模式,一次多发;使用零拷贝的技术,尽量不要多次从 用户态到内核太的拷贝数据。
      使用零拷贝,可以收到数据socket-发送数据socket直接拷贝。

    kafka client,基本上用到了上面提到的tip。
    kafka是一个异步发送的模型, 需要发送的数据,先存在本地的一个buffPool里面,
    然后等待一段时间,或者达到一定数量,就一起发送,类似于大巴车待客出发的模式。
    调用client.close()会强制刷新

    1. 自动处理服务端变化的能力。
      服务端1开始10个服务器,后来扩展到20个服务器,client需要有自动rebalance的能力。
      kafka 提供了几种模式,可以指定分区,也可以不指定, 不指定的时候,需要获取集群的metadata,
      然后根据meta的信息来决定待发送的 leader是否 alive等。
      上面说的是扩容的情况,其实服务端的变化,还包含宕机和恢复。 kafka都可以通过metadata接口来感知,
      并作出相应的变化。

    2. 如果product 自己宕机了怎么办?
      现有文档对这个问题涉及的比较少,只是在 exactly once 有相关的论述。
      如果product 发送A 的过程中,自己宕机了,那么恢复之后如何知道,这条A 是否需要从新发送呢?
      可以记录A消息是否收到ACK,如果收到不重发,如果不收到重发。
      但是有可能ACK是中途丢失了了,重复发送,kafka 就可能收到两份A重复的。 这就比较麻烦了,
      这个时候需要给消息一个全局的ID,kafka判断A是否已经重复。
      所以说 client 无法做 exactly once的,只能靠kafka 去判断,两者还需配合。

    接收者高性能+ 高可用如何保障

    • 是如何存储数据的
    • 是如何做扩展的
    • 数据一致性是如何保证的
    • 分片和副本是如何做到高可用的
    • 读/写是如何分别做到高性能的
    • 历史数据会永远保存吗?
    1. 先看存储的具体骚操作
      从写入1台,到写入 3台server。
      分布式系统的三宝,分片,副本,一致性。
      比如有个topA, 那么他会存在 3台服务器上,都是以文件的方式存储。
      A-1, 2: A-2, 3:A-3;
      分片的方法就是大规模横向扩展, 这样就能更具需求加机器,每个机器都分担一点压力。

    我们知道,在只有一台存储服务器的时候,我们需要给他做备份,在分布式里面也一样。
    每一个分片可以类比与一台电脑,我们需要给他做一个备份,专业术语就是副本。

    副本高效的同步一直是分布式系统的一大难题,而 leader-follower 模型是相对简单的,在leader 宕机的时候采用一半选举机制来选出新的leader。
    首先,我们要明确leader的作用,就是一切read/write的接口,follow只负责做备份,不对外提供服务。

    kaf的存储方式非常值得借鉴,如何实现高效的文件读写系统。
    比如,要写入a,b,c,d,e,f,g 7条数据到1个分区我们可以看看是如何实现的。
    原理:要想读的快,必须要靠索引。
    0.log, 文件存放 a,b
    3.log, 存放 c,d
    5.log, 存放 e,f
    7.log,存放 g。

    现在需要消费 f,offset = 6 的数据。
    总不可能对所有文件进行扫描,这个时候就需要索引文件了。
    0.index,3. index. 5.index, 7.index
    index 的文件名为 offset 的偏移量。
    比如offset = 6,的数据一定是在 5.index 里面,这里存了5-7的数据。
    锁定index文件,是一个二分查找。
    那么 5.index 的内容又是什么呢?
    5-> 第一行
    6-> 第二行
    index 文件肯定是倒排索引了,这个索引是 log 文件的行数。
    通常 一个 log文件大概为1G, 那么对应的 index其实也比较多,为了节省空间以使得index
    可以放入内存。 kaf使用了稀疏索引。 并不是为每一行都做倒排,读取的时候可能需要小范围的一个遍历。

    写的时候,由于一个分区都是有效的,所以会往最后一个文件进行顺序追加,速度是非常快的。

    2)副本同步机制ISR
    在前面提到,kaf在收到生产者的消息之后,有三种处理方式

    1)leader 写入成功之后,就发送ack,这种情况下吞吐最大

    2)leader+follower全部写入成功再发送ack,影响吞吐量,并且有些follow的宕机和阻塞可能造成大面积的等待。

    3)kaf的 ISR机制,维护一个ISR列表,整个列表中的节点写入成功就返回ack。

    ISR机制是kaf的一个特色。kaf的 leader+follower模式,并没有使用 一半投票选举法。 选举方法的劣势是,如果要容忍n台机器的失败,那么必须有2n+1台机器做副本,这个开销是比较大的。

    ISR的思想是什么,就是把整个副本中大家比较一致的维护起来,这种一致性代表他们的数据层面比较接近,当有leader挂了之后,就从这个列表里面拿一个作为新的leader,而无需选举。

    可以看到,这种情况下,容忍n个节点坏掉,只需要n+1个副本。

    那么ISR队列是怎么维护的呢,以三个副本为例进行解释。

    一开始,初始状态(a,b,c) 大家都是0,

    来了一条消息,写入到 a,这个时候,b,c 作为follower需要一种机制去pull数据。

    kaf里面是一个连接去pull leader的数据,为了性能考虑,做一个半长连接,leader有数据会推送过去。

    正常情况下,follower都是能同步到数据。如果例外来了,比如宕机重启,网络繁忙,可能拉取不及时,造成了一定时间没有与leader同步。 这个时间是可以设置的,最新的版本kaf就是根据多久没拉数据来决定是否删除 ISR。

    最开始的版本,还有一个变量控制,follower与 leader相差多少个消息,后来这个变量取消了。因为生产者可以发送batch,用这个变量不太好度量。

    ISR总的来说是处理follower中的例外的,有例外的就踢出,以保证大家基本在一个频道上,这么做的好处,当有leader废掉之后,可以直接从 ISR拿一个作为leader。

    假设所有的节点都是健康的,那么写入的时候,实际上是所有follower都需要写入之后才会返回ack。

    那么在leaer 与follower同步的这段时间内,有消费者来消费数据,会是个怎么处理的呢?

    例如,a=1,2,3 条数,b=1,2,c=1, 各个节点 维护一个坐标叫 highwater,这个值就是提交过的消息的下一个值。

    也就是hw =2; 尽管 a的long end =4(下一个值); 消费的数据要保证是大家都有的。 在这个例子里面,如果能消费2,这个2 还是没有提交的。假设 a,b挂了,生产者可会重发,等一切正常之后,消费者可能还是会重复消费2 。

    个人认为hw和 long end 是处于这个考虑。

    https://www.cnblogs.com/huxi2b/p/7453543.html

    1. 如何做扩展的?
      broker节点可以随时加入和退出,那么必须有一个地方存储这些信息,以便集群知道整个情况。
      中心化和p2p的模式都可以实现,kaf使用了中心化的模式,大量的配置信息存在了zookeeper这样一个
      可以做分布式容错的配置中心。
      比如,新增加了一个节点, 他会向zookeeper注册,加入到brokerList里面。
      比如,丢失了一个broker, zookeeper会发现这个问题,涉及的后续操作包括是否需要选择新的
      controller。controller 决定是否需要为一些丢失的分区选择新的leader。
      如果是一些follower丢失宕机了,zookeeper还维护 ISR的列表,会决定ISR的增删改查。
      另外kaf可以增加分区和副本数,这些配置信息都是需要zookeeper协调处理的。

    2. 历史数据会留多久。
      kaf的数据都是存在磁盘上的,对于一个topic,可以被不同的消费者组消费,
      比如新建一个消费者组,他实际上是可以消费所有保留的历史数据的。
      这个历史数据默认是保留7天,可通过设置参数改变默认行为。

    消费者高性能+高可用

    • 消息太多,如何做到快速消费的
    • 增加和减少机器,整个集群会发生什么变化
    • 消费者会多次消费数据吗, 断开之后,再续上,消费数据会有什么变化。
    • 消费过程中断网会怎么样
    • 如何保证数据只消费一次

    消费端要解决的问题:
    1)单台机器消费要快,支持弹性的扩容,可以根据消息量动态的增加多台机器。

    单台机器消费能力的提升其实和product差不多。

    • 采用pull的方式拉取数据,可以做一个 long polling, 一定时间之后再timeout。
    • 拉取的时候,尽量多拉取数据,比如达到一个pool的size。
    • 另外存储端,我们前面说过他的存储,给定一个offset,他是通过索引快速定位到文件行,并通过零拷贝的方式发送数据到网络。

    单机的提升总是有限的,那么比如通过扩容机制来处理。比如1个topic 有 5个 分区。 如果只有2个消费者,那么根据kaf的分配风格,两台机器(1,2,3)(4,5),一个分区的数据只能被一个机器消费。所以我们最多可以使用5台机器进行扩容消费。 添加机器的时候,会触发一个rebalance的事件。这个可以通过注册节点到zookeeper由它来协调。比如新加一台机器,消费对应关系(1,2)(3,4)(5)。可以看到这个在平衡的过程后,第二台和新加入的机器是需要把一些信息拿过来的。 比如,第二台机器需要 获取,原来分区-3 最后一个消费的offset。
    同理可以推导移除消费者的整个过程。

    2)如何做容错的
    第一种问题就是消费机器,变慢了或者说是宕机了。 通常可以通过心跳机制来维护消费者的状态。kaf里面可以通过polling信号来作为心跳的一种,比如一定时间没有收到polling请求,就认为该机器不可达,就有做相应的列表移除和rebalance的操作。重连之后,尚未提交的ack 的消息还是可以再次拉取。

    这里引出一个问题,就是怎么样的数据才算真的消费,依旧是ack机制,和product一样的概念。

    比如来取到数据到客户端之后,就可以自动提交ack;如果这个时候宕机了,该消息未处理就丢失了。

    也可以是程序手动提交ack,也就是真正在处理完业务逻辑之后,再发送给服务器端,告诉它某个 offset的消息已经处理完毕。 这个时候如果发送出去的 ack 丢了,那么下次还是可能会拉取到重复数据,如果是对 exactly once要求特别严格的程序,是需要自己做一些额外的工作保证幂等性的。

    kaf里面有两种提交方式,一种是auto,大概5秒自动提交一次;一种是手动,一般选择手动提交的比较多,手动里面分布同步和异步,同步可以比较好的处理异常情况,而异步提交,很可能,一个条offset=3,成功,后续一个提交offset=2成功。 异常情况下,顺序性等比较难保证,可能会造成重复消费数据。

    3) 其他一些定制化的tips
    提交offset的时候,默认是提交polling 拿到的最新的一个消息偏移,当整个batch比较大的时候,我们可能希望消费一部分就尽可能的提交一部分,避免一个宕机前功尽弃。 consumer的api 提供了指定offset的提交方式。
    polling的时候,也可以指定具体的offset的位置,可以决定从头开始,还是从具体的某个特定位置开始。

    4)精确的一次消费

    幂等性在kaf里面是一个很大的话题,后面单独讨论。
    1.Producer如何来实现。

    构建一个 唯一 id 。(pid,topic,partion, seq)
    服务端,把这个信息存起来。
    解决的问题: 单个分区发送的信息,有序无重复。

    失败的例子-1)
    如果这个分区全部挂了,pid写入到其他分区。
    然后,这个分区又好了。 里面存在了两份数据。
    造成了重复。

    失败的例子-2)
    producer挂了, 假设业务上保存了最后一数据(msg+ seq)
    那么如果 知道自己不变的 pid, 也是可以恢复的。
    但是应为是分布式,要如何保存这个pid 唯一不变呢?
    如果机器挂了,换一条,必须要取个不一样的名字,如果一样,等恢复了怎么办?
    所以这里行不通。

    目前kafka,的单个分区,单个session 模式能保证 exactly once。

    如果要多份区呢?

    1. 事务ID
      通过上面的推理,我们必须要保障一个 全局的ID 来决定一个全局的 唯一ID

    要跨分区,必须要有一个地方存储中心化的信息吧?
    不然如果解决 失败的例子-1) 这种情况呢?

    把(transId,prid,topic,seq) 作为一个id。就不用考虑分区了。
    存在kafka的一个公共队列里面。

    貌似加了 生成这边就没什么问题了。。。

    但是更难的问题是, 如果保障 多个写,读,混合的操作,满足事务的语义
    要么全部提交,要么全部abort。
    这个就涉及到 commitOffset 和 readOffset的处理和协调问题。
    看不太懂= TODO
    http://www.jasongj.com/kafka/transaction/

    设计上为什么是优秀的,有哪些亮点可以借鉴与分布式系统中。

    • 分布式核心,分区,副本,一致性。
    • 文件存储的高性能,顺序读写,分块索引
    • master-slave 简单的容错机制

    扩展的基础知识

    • RPC 框架与 异步RPC实现原理
    • push和poll模式区别,pull 模式 半长连接的效率提升
      https://www.jianshu.com/p/6e90c2f2e463
      核心,使用poll 模式,hold住一个长连接,有数据之后就返回。或者是等待超时就返回,尽可能的让每一次pull的消耗都是有价值的。
    • 分布式的难题, exactly once。

    相关文章

      网友评论

          本文标题:kafka 学习笔记

          本文链接:https://www.haomeiwen.com/subject/oqajdktx.html