美文网首页
关于消费者

关于消费者

作者: kar_joe | 来源:发表于2020-01-11 21:28 被阅读0次

    消费模型

    • Kafka 的每个 Consumer(消费者)实例属于一个 ConsumerGroup(消费组);
    • 在消费时,ConsumerGroup 中的每个 Consumer 独占一个或多个 Partition(分区);
    • 对于每个 ConsumerGroup,在任意时刻,每个 Partition 至多有 1 个 Consumer 在消费;
    • 每个 ConsumerGroup 都有一个 Coordinator(协调者)负责分配 Consumer 和 Partition 的对应关系,当 Partition 或是 Consumer 发生变更时,会触发 rebalance(重新分配)过程,重新分配 Consumer 与 Partition 的对应关系;
    • Consumer 维护与 Coordinator 之间的心跳,这样 Coordinator 就能感知到 Consumer 的状态,在 Consumer 故障的时候及时触发 rebalance。
    • 为加速消费,可以增加topic分区并增加消费者实例
    • 既支持点对点又支持订阅/发布模型

    消费者内部线程模型

    KafkaConsumer 采用双线程的设计,即用户主线程和心跳线程。所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性
    引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
    KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中。KafkaConsumer 中有个方法是例外的,它就是 wakeup(),你可以在其他线程中安全地调用 KafkaConsumer.wakeup() 来唤醒 Consumer

    消费者业务线程模型

    为了加速消费,提高并行性,消费者端引入多线程。

    多消费实例

    消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。


    image.png

    单消费实例,多业务线程

    消费者程序使用单消费实例,但创建多个消费线程并行消费。实现难度较大,容易造成乱序。


    image.png

    多消费示例,每个实例又多消费线程

    该方案是上述两种方案的整合


    image.png

    重平衡reblance

    Coordinator 是消费位移消息所提交的分区的leader所在broker,负责消费者组的组成员管理和各个消费者的位移提交管理。
    某个组的所有消费者保持向该组对应的Coordinator 发送心跳,heartbeat.interval.ms既设置了心跳间隔,也控制重平衡通知的频率。重平衡的通知机制也是通过心跳线程来完成的。当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

    出现场景

    • consumer发送心跳(heartbeat.interval.ms)超时(session.timeout.ms),被Coordinator 踢出组;
    • consumer消费太久poll超时(max.poll.interval.ms),自己主动退组
      缺点:不仅导致reblance,再提交offset时还会遇到CommitFailedException异常
      解决办法:控制一次性拉取的消息数(max.poll.records)、多线程、控制业务逻辑复杂度;
    • topic变动
    • topic的分区变动

    负面影响

    • 所有消费者需要暂停消费
    • 任务重新分配,tcp连接也要重新建立
    • 可能导致消费者offset提交异常,导致重复消费
    • 整个过程图很慢,浪费系统性能

    重平衡状态机

    kafka借助于状态机实现重平衡


    状态含义
    状态切换

    当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。在 Kafka 的日志中一定经常看到下面这个输出:Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.这就是 Kafka 在尝试定期删除过期位移。只有 Empty 状态下的组,才会执行过期位移删除的操作。

    重平衡过程

    在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
    当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
    目前服务端做成无状态,既是优点也是缺点,优点是降低了服务端的维护成本,但是缺点是每次重平衡,历史分区信息都要临时搜集,过程复杂,而且所有消费者都要停下手中工作并参与重平衡。并且之前任务重分配也不考虑历史分配,在0.11.0.0版本才引入粘性冲平衡策略。

    1. 消费者端


      image.png
      image.png
    2. 协调者端
    • 新成员入组


      image.png
    • 组成员主动离组


      image.png
    • 组成员崩溃离组


      image.png
    • 重平衡时协调者对组内成员提交位移的处理


      image.png

    相关文章

      网友评论

          本文标题:关于消费者

          本文链接:https://www.haomeiwen.com/subject/qtubactx.html