美文网首页
Kafka中的客户端

Kafka中的客户端

作者: 就这些吗 | 来源:发表于2020-03-29 17:33 被阅读0次

    阅读以下内容你将了解到:
    1.消费者的分区分配策略
    2.消息中间件的传输保障层级
    3.Kafka中的幂等(幂等只对单个生产者中单分区生效)
    4.4.Kafka中的事务(实现流程)
    5.事务在生产者与消费者的语义区别

    1.消费者的分区分配策略

    RangeAssignor分配策略
    分区数整除以消费者总数来分配,如果有余数,那么靠前的消费者会多分配一个分区
    带来的问题:比如有消费者C0,C1,订阅了2个主题,且这2个主题都只有3个分区,分配结果为:
    消费者C0:t0p0,t0p1,t1p0,t1p1(t0p0意为topic编号为0,此topic的partition编号为0的分区,下同)
    消费者C1:t0p2,t0p2
    很明显会造成分配不均匀的情况。

    RoundRobinAssignor分配策略
    将消费者组内所有消费者及消费者订阅的主题排序,轮询将分区分配给消费者。
    看似很完美的分配策略,但是如果一个消费组内的消费者订阅信息不同,还是会造成分配不均匀。
    举个栗子:
    某消费者组内有3个消费者(C0,C1,C2),C0订阅了t0,C1订阅了t0和t1,C2订阅了t0、t1和t2,这三个主题分别有1、2、3三个分区。那么最终结果为
    消费者C0: t0p0(t0p0意为topic编号为0,此topic的partition编号为0的分区,下同)
    消费者C1: t1p0
    消费者C2: t1p1、t2p0、t2p1、t2p2

    StickyAssignor分配策略
    此分配策略有两个目的
    1.分区分配尽可能均匀
    2.分区分配尽可能与上次分配的保持相同(当与第一点冲突时,优先第一点)

    这种分区策略的代码实现异常复杂,但是也更加优异,其中一个优点就是使分区分配具有“粘性”,减少不必要的分区移动

    当然我们也可以自定义其他的分配策略,通过自定义逻辑,可以做到一个分区在组内广播的逻辑。
    注意:每个消费者都可以设置自己的分区分配策略,一个消费组内的分区分配策略可以看做被各个消费者支持的最多的策略。

    2.消息中间件的传输保障层级

    一般而言,对于消息中间件(注意不只包括Kafka)的消息传输保障有三个层级
    1.at most once:至多一次。消息可能会丢失,但是绝对不会重复传输。
    (类比于消费者先提交消费后进行消息处理,中间消费者宕机的情况)
    2.at least once:至少一次,消息绝不会丢失,但可能会重复传输。
    (类比于Kafka中的重试机制,或者消费者先处理消息后提交消费,中间消费者宕机的情况)
    3.exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。
    (在0.11.0.0版本开始引入幂等和事务的概念,以此来实现“恰好一次”)

    3.Kafka中的幂等

    实现幂等的原理和注意的点:
    每个生产者实例在初始化的时候回分配到一个PID,对于每个PID来说,消息发送到每一个分区都有对应的序列号,broker会在内存中会有<PID,分区>——序列号的映射关系.

    此时,对于每一条消息,我们可以比较他的序列号值(SN_new)比broker中对于的序列号刚好大1,才会被接收,如果SN_new<SN_old+1,说明消息被重复写入,直接丢弃即可。如果SN_new>SN_old+1,则可能有消息丢失,这是比较严重的问题,会直接报错异常。

    注意:阅读上面一段文字时,你应该注意到实现幂等是对于每一对<PID,分区>而言的,也就是说,Kafka的幂等只能保证单个生产者会话中单分区的幂等。

    4.Kafka中的事务

    幂等性并不能跨分区运作,而事务可以弥补这个缺陷。

    事务可以保证多个分区写入操作的原子性。(事务要求生产者开启幂等特性)
    Kafka中的事务可以使应用程序将消息消费、生产消息、提交消费位移当做原子操作来处理,同时成功或失败,即时该生产或消费会跨多个分区。(需要应用程序提供唯一的transactionlId)
    transactionlId与上文幂等中提到的PID类似,不过transactionlId是由我们来显示的设置,而PID是Kafka内部分配的。需要保证新的生产者启动后具有相同的transactionlId的旧生产者立即失效。(也就是说,事务是针对一个生产者多个分区的)
    transactionlId主要用来找到特定的事务协调器

    具体实现如下:
    每个生产者都会被指派一个特定的TransactionCoordinator(事务协调器),这个协调器会把事务的状态持久化到内部主题_transaction_state中。
    1.先通过transactionlId找到自己的TransactionCoordinator(通过对transactionlId进行hash取模找到对应的分区,再通过分区leader副本来确定broker节点)
    2.为当前生产者分配一个PID,并将transactionlId和对应的PID保存到主题_transaction_state中 (注意,如果没有开启事务而只开启了幂等,任意的broker都可以来分配PID)
    3.开启事务, 接下来才是重头戏,当生产者给一个新的分区发数据前,会把<transactionlId,TopicPartition>的对应关系存储在主题_transaction_state中,有了这个对照关系我们就可以在后续的步骤里为每个分区设置COMMIT或ABORT标记。
    4.通过sendOffsetsToTransaction()方法可以在一个事务批次里处理消息的消费和发送
    5.一旦消息被写入成功(失败),就可以调用commitTransaction()方法或者abortTransaction()方法来结束当前事务。
    此时就准备进行事务最后的操作了。
    (1)将PREPARE_COMMIT或PREPARE_ABORT消息写入主题_transaction_state(用来存储事务的状态)
    (2)将COMMIT或ABORT信息写入用户所使用的普通主题和_consumer_offsets
    (3)将COMMIT_COMMIT或者COMPLETE_ABORT信息写入内部主题
    _transaction_state。此时便可以删除主题_transaction_state中所有关于该事务的消息了。(这个主题中采用的清理策略为日志压缩,将相应消息设置为墓碑消息即可)

    再简单梳理一遍:事务的实现需要有一个特定的事务协调器来实现,还会把transactionlId(主要用来找到特定的事务协调器)、PID(用来确定唯一的分区)、TopicPartition(事务中发送消息的频道和分区)之间的映射关系保存下来,此时就可以开始事务了,等消息被成功(失败)写入,来COMMIT或者ABORT。

    以COMMIT为例,先要在_transaction_state设置PREPARE_COMMIT的标记以后再来COMMIT普通主题(这样做是为了保证事务的持久性,如果断电后仍可以保证这个事务再Kafka重启后被正确提交),等普通消息都COMMIT提交完成后再将_transaction_state设置设置为COMMIT_COMMIT。此时便可以将关于这个事务的消息删除了(这个主题中采用的清理策略为日志压缩,将相应消息设置为墓碑消息即可)。

    5.事务在生产者与消费者的语义区别

    通过事务,Kafka可以保证生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。
    而对于消费者而言,事务的语义相对偏弱。Kafka并不能保证已提交的事务的中所有消息都能被消费:
    因为我们的事务只保证写入的成功。

    1.对于采用日志压缩的主题而言,事务中某些消息可能被清理(相同的key消息后来的消息覆盖前面的消息)
    2.对于采用日志删除的主题而言,事务中消息可能分布在不同的日志分段,当老的日志分段被删除时候,相应的消息也会丢失。
    3.消费者通过seek()访问到任意offset的消息,从而可能遗漏事务中的部分消息(比如从事务的中间段开始消费)
    4.消费者再消费的时候没有被分配到事务内的所有分区。

    相关文章

      网友评论

          本文标题:Kafka中的客户端

          本文链接:https://www.haomeiwen.com/subject/uffyuhtx.html