Kafka(六)深入原理--架构组成及生产者Producer

作者: 我犟不过你 | 来源:发表于2021-02-05 16:10 被阅读0次

    在kafka中有几个重要的组成:broker,producer,consumer(consumer group),zookeeper,topic。我们现在针对每个组件去单独介绍。

    一、模块组成

    kafka的架构如下图所示:

    kafka架构.jpg

    上图中包含了一个kafka集群的所有组件
    1)三台broker集群
    2)三台zookeeper集群
    3)一个生产者producer
    4)一组消费者consumer group
    5)一个单独的消费者consumer3
    6)一个topic1,其中partition是2,表示每个topic有两个分区,replica是3,表示每个分区有三个副本,分布在每台broker上。

    蓝色消息发送的流程
    1)producer发送蓝色消息到topic1;
    2)假设发送到broker1的topic1中的partition1中,此时这个partition1自动成为了当前partition三个副本replicas的leader,则broker2和broker3的两个partition1自然的成为follower;
    3)由当前的replica leader负责当前分区消息的读和写,另外两个分区follower会从leader同步消息。
    4)consumer group的consumer1去消费partition1的leader中的消息,则consumer2是不能消费的;同一组内只有一个consumer可以消费消息。

    红色消息发送的流程
    1)producer发送蓝色消息到topic1;
    2)此时发送到broker2的topic1中的partition2中,此时这个partition2自动成为了当前partition三个副本replicas的leader,则broker1和broker3的两个partition2自然的成为follower;
    3)由当前的replica leader负责当前分区消息的读和写,另外两个分区follower会从leader同步消息。
    4)consumer3去消费partition2的leader中的消息。

    再简单模拟了消息的分布和生产消费过程后,我们具体说明每个组成的功能
    1)producer:消息生产者,向kafka的broker发送消息的客户端。
    2)consumer: :消息消费者,向 kafka broker拉取消息的客户端;
    3)consumer group:消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    4)broker:一台kafka服务器就是一个broker,一个集群在有多个broker。一个broker内可存放逗哥topic。
    5 )topic :可以理解为一个队列, 生产者和消费者面向的都是一个 topic。
    6 )partition :一个非常大的topic可以分布到多个broker上,一个 topic分为多个partition,每个partition是一个有序的队列。
    7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,使得kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
    8 )leader:每个分区多个副本的主节点,负责数据的读写,即生产者发送数据的对象,以及消费者消费数据的对象都是leader。
    9 )follower:每个分区多个副本中的从节点,实时从leader中同步数据,保持和leader数据
    同步。leader发生故障时,某个follower会成为新的leader。

    二、kafka的存储机制

    Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘存储数据的。

    Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic存储结构见下图:

    kafka存储机制.jpg

    由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片索引机制,将每个partition分为多个segment。每个 segment对应两个文件“.index”文件和“.log”文件。

    partition文件夹命名规则:
    topic 名称+分区序号,举例有一个topic名称文“kafka”,这个topic有三个分区,则每个文件夹命名如下:

    kafka-0
    kafka-1
    kafka-2
    

    index和log文件的命名规则:
    1)partition文件夹中的第一个segment从0开始,以后每个segement文件以上一个segment文件的最后一条消息的offset+1命名(当前日志中的第一条消息的offset值命名)。
    2)数值最大为64位long大小。19位数字字符长度,没有数字用0填充。

    举例,有以下三对文件:

    0000000000000000000.log
    0000000000000000000.index
    0000000000000002584.log
    0000000000000002584.index
    0000000000000006857.log
    0000000000000006857.index
    

    以第二个文件为例看下对应的数据结构:

    log和.index存储机制.jpg

    这里面使用的是稀疏索引,需要注意下:

    消息查找过程

    找message-2589,即offset为2589:
    1)先定位segment文件,在0000000000000002584中。
    2)计算查找的offset在日志文件的相对偏移量
    offset - 文件名的数量 = 2589 - 2584 = 5;
    在index文件查找第一个参数的值,若找到,则获取到偏移量,通过偏移量到log文件去找对应偏移量的数据即可;
    本例中没有找到,则找到当前索引中偏移量的上线最接近的值,即3,偏移量文246;然后到log文件中从偏移量为246数据开始向下寻找。

    三、生产者Producer

    3.1 生产者组成

    通过下图看下生产者发送消息的流程:

    kafka producer.png

    1)组装ProducerRecord,执行发送方法。
    2)经过序列化器Seriallizer,将key和value经过序列化成为二进制数组。发送到分区器。
    3)在分区器如果制定了partition,则直接返回对应的partition;否则分配器将基于key值来返回一个分区。
    4)确定分区后,将这些消息放到指定topic和partition的批量消息中。由另外的线程负责发送批量消息。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
    5)broker接收到消息后,如果成功会返回一个RecordMetadata,失败且不重试的话,则会返回一个异常。

    3.2 分区策略

    3.2.1分区解决的问题

    1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
    2)提高了并发,可以以partition为单位进行读写。

    3.2.2分区的使用及其原则

    kafka允许我们在发送消息的时候指定分区,我们需要将发送的消息封装成ProducerRecord:

    ProducerRecord

    然后调用KafkaTemplate中的send方法:

    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
            return this.doSend(record);
        }
    

    从上面的多个构造方法中,我们看到可以传递不通的参数,其实不通的参数有不同的分区原则
    (1)指明partition的情况下,直接将指明的值直接作为partiton值;
    (2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
    (3)既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition。
    值,也就是常说的 round-robin 算法。

    通过如下代码简单实践下:
    三种接口:partition1和partition2、partition3。

        /**
         * 传partition
         *
         * @param topic
         * @param partition
         * @param key
         * @param value
         * @return void
         * @author weirx
         * @date: 2021/2/5
         */
        @RequestMapping("/send/partition1")
        public void sendPartition1(String topic, Integer partition, String key, String value) {
            ProducerRecord producerRecord = new ProducerRecord(topic, partition, new Date().getTime(), key, value);
            producer.sendPartition(producerRecord);
        }
    
        /**
         * 无partition,有key
         *
         * @param topic
         * @param key
         * @param value
         * @return void
         * @author weirx
         * @date: 2021/2/5
         */
        @RequestMapping("/send/partition2")
        public void sendPartition2(String topic, String key, String value) {
            ProducerRecord producerRecord = new ProducerRecord(topic, key, value);
            producer.sendPartition(producerRecord);
        }
    
        /**
         * 没有partition,也没有key
         *
         * @param topic
         * @param value
         * @return void
         * @author weirx
         * @date: 2021/2/5
         */
        @RequestMapping("/send/partition3")
        public void sendPartition3(String topic, String value) {
            ProducerRecord producerRecord = new ProducerRecord(topic, value);
            producer.sendPartition(producerRecord);
        }
    

    按顺序分别测试,参数都如以下方式给:

    # 都传
    http://localhost:8085/test/kafka/send/partition1?topic=test-kafka&key=weirx&value=hello%20kafka&partition=9
    # 无partition
    http://localhost:8085/test/kafka/send/partition2?topic=test-kafka&key=weirx&value=hello%20kafka
    # 无partition和key
    http://localhost:8085/test/kafka/send/partition3?topic=test-kafka&value=hello%20kafka
    

    分别得到结果是:
    传partition的情况下消息确实存储在partition9:

    2021-02-05 12:18:59.241  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 3, CreateTime = 1612498739238, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
    2021-02-05 12:18:59.241  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    

    不传partition的情况,连发三次,都是在parttition3

    2021-02-05 12:20:53.549  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 7, CreateTime = 1612498853547, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
    2021-02-05 12:20:53.549  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:20:59.532  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 8, CreateTime = 1612498859530, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
    2021-02-05 12:20:59.532  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:21:02.261  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 9, CreateTime = 1612498862258, serialized key size = 5, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = weirx, value = hello kafka)
    2021-02-05 12:21:02.261  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    

    都不传的情况,连发11条,发现第一条和第十一条都是partition2,中间没有重复,符合上面的结论。

    2021-02-05 12:22:59.484  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 2, offset = 2, CreateTime = 1612498979481, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:22:59.485  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:00.988  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 1, leaderEpoch = 2, offset = 2, CreateTime = 1612498980985, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:00.988  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:01.666  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 8, leaderEpoch = 2, offset = 2, CreateTime = 1612498981663, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:01.666  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:02.307  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 7, leaderEpoch = 2, offset = 2, CreateTime = 1612498982304, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:02.307  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:02.882  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 6, leaderEpoch = 0, offset = 2, CreateTime = 1612498982880, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:02.882  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:03.402  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 2, offset = 3, CreateTime = 1612498983400, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:03.402  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:04.026  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 4, CreateTime = 1612498984024, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:04.026  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:04.627  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1612498984625, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:04.628  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:05.475  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 4, leaderEpoch = 2, offset = 3, CreateTime = 1612498985473, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:05.475  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:06.154  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 10, CreateTime = 1612498986152, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:06.154  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    2021-02-05 12:23:09.396  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 2, offset = 3, CreateTime = 1612498989394, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
    2021-02-05 12:23:09.397  INFO 10884 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ------------------ message =hello kafka
    

    3.3 ack

    ack(acknowledgement),是kafka的一种确认机制,保证生产者发送的消息能够可靠的的发送到broker。每个topic的partition收到消息后都需要向producer返回一个ack。如果收到表示消息发送成功,否则会再次发送。

    其具体的ack机制如下所示:

    ack.png

    1)左侧表示成功发送,send并且接收到ack,继续执行next send,也收到ack。
    2)右侧表示send首次失败了,则去resend到其他的partition上,成功接收到ack。然后再去执行next send。

    3.3.1 同步机制

    通过上面的分析会产生一个问题:何时发送ack?

    通常有两种方式:
    1)超过半数的follower同步成功
    2)所有follower同步成功。

    kafka采用如上方案的第二种:全部follower通过,发送ack

    采取以上的ack方式,又会引发出下一个问题:当一个follower节点由于某种故障,迟迟不能与leader进行同步,此时leader需要一直等待,直到其同步完成。

    由于上面的问题,kafka提供了一种机制:ISR(in-sync Replicas)

    ISR(in-sync Replicas):Leader会维护一个ISR(in-sync Replicas),内部是所有和leader进行同步的follower,当ISR的所有follower完成同步后,leader会发送ack给producer。如果follower长时间未向leader同步数据,则会将该follower踢出ISR。如果leader宕机了,则会从ISR重新选举leader。

    除此之外还有OSR(Out-Sync Relipcas):不能和leader保持同步的集合。

    3.3.2 ack的应答机制

    kafka为用户提供三种可靠性级别。在springboot的yml文件中通过acks的配置:

    kafka:
        bootstrap-servers: 192.168.184.134:9092,192.168.184.135:9092,192.168.184.136:9092
        producer:
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的leader节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: all
    

    以上机制酌情使用,都会存在问题:
    0:broker故障,数据丢失。
    1:leader落盘成功并返回ack,follower同步失败或leader故障。
    all:follower同步数据完成,发送ack之前leader故障,则造成了重复数据。

    3.3.3 故障处理机制

    首先了解两个名词:LEO(Log End Offset)HW(High Watermark)
    LEO:每个副本的最后一个offset。
    HW:所有副本中最小的LEO。

    LEO and HW.png
    当leader发生故障时

    但leader发生故障,会从ISR中选出一个新的leader;为了保证副本的一致性,其余的follower会截掉各自log文件中高于HW的部分,然后重新从leader同步数据。

    注意:只保证了副本将数据的一致,不能保证数据不丢失或者不重复。

    当follower发生故障时

    当follower发生故障时,会被踢出ISR,待其恢复时,follower会读取磁盘记录的上次的HW,并将log文件中高于HW的部分截掉,,从HW开始同步,追赶leader,当该follower的LEO大于等于HW时,即已经追上了leader,就可以重新加入ISR了。

    3.4Exactly Once 语义

    上面说过生产者可以设定可靠性的级别:
    当设置为0时,可以保证每条消息只会发送一次,即At Most Once语义。保证不重复。但不保证数据不丢失。
    当设置为all时,可以保证每条消息都会发送成功,即At Least Once语义。保证不丢失,但是不保证不重复。

    那么有没有办法保证既不丢失也不重复呢?
    这里就要提到Exactly Once语义

    在0.11版本的Kafka,引入了一项重大特性:幂等性
    所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条

    幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。
    即:At Least Once + 幂等性 = Exactly Once

    要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可。这个配置目前在新版本中无法通过配置文件直接配置,可以通过手动配置kafka的配置文件添加进去。

    Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个 PID,发往同一Partition的消息会附带Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

    作用范围

    1)PID 重启就会变化。只能实现单回话上的幂等性,这里的会话指的是Producer进程的一次运行。
    2)同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

    如果需要跨会话、跨多个topic-partition的情况,需要使用Kafka的事务性来实现。关于事务的原理后面会讲解。


    本章到此为止,下一章节讲解消费者相关的内容。

    相关文章

      网友评论

        本文标题:Kafka(六)深入原理--架构组成及生产者Producer

        本文链接:https://www.haomeiwen.com/subject/vzdmtltx.html