美文网首页
Kafka可靠性机制

Kafka可靠性机制

作者: 无暇的风笛 | 来源:发表于2021-12-14 14:01 被阅读0次

    一、组件

    介绍一下kafka进行数据复制时会涉及到的一些组件概念

    1. zookeeper:维护集群信息,当broker加入或退出时,kafka通过订阅zookeeper就能获得通知

    2. broker:一个独立的kafka服务器称为一个broker。broker接收来自生产者的消息,为消息设置位移,并将消息刷入到磁盘里。broker并且提供消费者服务,对读取的分区数据提供响应。

    3. 控制器/Controller:除了有一般broker的功能外,还会负责分区首领的选举,使用epoch来控制“脑裂”。

      集群里第一个启动的broker通过在Zookeeper里创建一个临时节点/controller使自己成为控制器,其他的broker节点在启动时也会尝试创建这个节点,但会提示失败,因为已经存在了,其它broker节点会在Zookeeper创建/watcher节点去感知控制器的状态,当控制器被关闭或者离开集群了,他们会再次尝试创建/controller节点重复同样的操作。

      新选举出来的控制器,会得到一个递增的controller epoch,其它broker在得知当前的controller epoch后,会忽略旧控制器发出的消息,避免了脑裂的现象。

      控制器可以进行broker分区选举。当分区首领所在的broker离开集群时,控制器遍历这些分区,并确定哪个副本会成为新的分区首领,然后向所有broker发送请求,该请求包含谁是新leader谁是follower,随后新首领开始处理来自生产者和消费者的请求,而follower开始从leader处复制消息

    4. 分区:kafka使用主题来组织数据,每个主题被划分为若干个分区,每个分区可以有若干个副本,分区分配遵循同一分区副本均匀分布在不同broker上。

      例如有4个broker,创建一个包含10个分区的主题,复制因子设置为2,那么总共有20个副本,可以按照如下方式分配给4个broker:

      1、若未指定机架信息,随机指定一个broker0,首领分区0分配给broker0,首领分区1分配给broker1,以此类推......随后从分区首领后开始,依次分配跟随者副本,例如分区0的首领在broker0,那么它的第一个跟随者副本会分配给broker1......

      2、若指定了机架信息,例如broker0和broker1在机架1,broker2和broker3分别在不同的机架,那么分区副本需要按照broker0,broker2,broker1,broker3进行交替分配

    5. 副本:分为首领(leader)副本和跟随者(follower)副本。

      • leader副本处理所有的写入和访问请求,另外会通过与follower保持状态的交互,维护一个isr列表;

        broker在处理请求时,如果收到一个包含特定分区的生产和读取请求,但是该分区的leader副本并不在该broker上,会导致报错。

        客户端会采用元数据请求方式,服务器会给出对应的响应,响应的消息会指明特定的主题,主题的分区、分区的副本以及副本leader信息,然后客户端会缓存起来便于下次直接访问。并会是不是更新元数据信息

      • follower的任务是复制leader的消息,保持与leader的一致性

      • ISR机制:每个分区都有一个ISR列表,用于维护所有的同步副本。leader副本必须是同步的,follower副本要满足两个条件才算是同步副本:

        1. 定时向zk发送心跳消息,保持与zk的活跃会话
        2. 持续向leader副本请求消息,在允许的消息量/时间延迟范围内保持与leader副本的消息同步(副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值)
    6. LEO:日志末端位移,记录每个副本中下一条消息的偏移量

    7. HW:水位值,记录当前topic已提交的偏移量。也即消费者能消费到的最大偏移量

    8. Leader Epoch

    二、消息的可靠性传递

    1. broker有3个配置可影响消息存储可靠性

      1. 复制系数:主题级别的配置参数是replication.factor,broker级别可以通过default.replication.factor来配置自动创建的主题;更高的复制系数可以带来更高的可用性、可靠性,但是也需要消耗更多的存储空间

      2. 不完全的首领选举:unclean.leader.election只能在broker级别配置,默认值是enable。

        当分区首领不可用时,一个同步副本会被选为新首领,如果在选举过程中没有出现数据丢失,那么这个选举就是完全的。如果允许不同步的副本成为分区首领,那么需要承担丢失数据和数据不一致的风险,如果不允许,那么就要接受较低的可靠性

      3. 最小同步副本:主题和broker级别上都可以配置参数min.insync.replicas参数,如果当前同步副本的个数小于这个参数时,那么生产者将不能往主题分区写入数据,分区也变成了只读状态。

    2. 生产者配置

      1. 发送确认配置:acks可配置3中不同的确认模式
        • acks=0:生产者能够把消息发送出去,则认为消息已成功写入kafka,这种配置可以得到最大的吞吐量带宽利用率,但是却最不稳定最有可能丢失数据
        • acks=1:分区首领在收到数据后写入分区数据文件时会返回确认或者失败的消息,如果生产者能正确处理错误消息,会重试尝试发送消息,最终消息会成功写入到分区首领。这种配置方式也有造成丢失数据的风险,当消息写入分区leader但是在follower复制时leader崩溃了
        • acks=all:生产者在消息写入分区首领和所有的副本后才确认消息被写入,这个参数会配合最小同步副本来使用,在确认最小写入副本数成功后就能返回继续处理下一条消息的继续写入。这种配置可靠性最高,但是吞吐率最低
      2. 配置重试次数:对于可重试解决错误的事件,生产者可以尝试重新发送消息;对于不可重试解决错误的事件,多次重试已失去意义,可以直接丢弃或保存到磁盘再后续处理。重试次数的配置主要看重试的目的是什么。
      3. 额外的错误处理:对于重试机制不能解决的错误,例如消息序列化失败,生产者重试次数达到上限,需要开发人员自行捕获异常并处理。
    3. 消费者可靠性配置

      1. 自动提交偏移量

        • enable.auto.commit(消费者再均衡后会有消息重复消费的情况)
        • auto.commit.interval.ms(自动提交开启,默认提交间隔是5s)
      2. 手动提交偏移量

        enable.auto.commit参数设置为false,手动提交偏移量分两类

        • 手动提交当前轮训的最大偏移量
        • 手动提交固定偏移量

        api分同步提交和异步提交两类

        • 同步提交:提交失败消息后阻塞,消费者进行自动重试,保证消息能够最大限度地提交成功,但会降低吞吐量

          while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
              for (ConsumerRecord<String, String> record : records) {
                  System.out.println(record);
              }
              /*同步提交*/
              consumer.commitSync();
          }
          
        • 异步提交:提交失败后不能自动重试,但是可以通过一个Map<TopicPartition, Integer> offsets对象来维护每个分区提交的偏移量,如果失败的偏移量小于最后一次已提交的偏移量,则不需要重试

          while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
              for (ConsumerRecord<String, String> record : records) {
                  System.out.println(record);
              }
              /*异步提交并定义回调*/
              consumer.commitAsync(new OffsetCommitCallback() {
                  @Override
                  public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception != null) {
                       offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
                                                                      x.topic(), x.partition(), y.offset()));
                      }
                  }
              });
          }
          

    相关文章

      网友评论

          本文标题:Kafka可靠性机制

          本文链接:https://www.haomeiwen.com/subject/wkmewltx.html