美文网首页
软件篇-kafka(over)-研究-深入研究生产消费模型

软件篇-kafka(over)-研究-深入研究生产消费模型

作者: 秃头猿猿 | 来源:发表于2021-05-13 20:32 被阅读0次

    1.生产者

    1.1 架构

    当需要往broker发送消息时,则需要创建一个或者多个生产者往broker发布消息,虽然借助SpringBootbroker里面发送消息的API比较简单

    如果借助SprinBoot发送消息,后面章节会阐述到

    但是很多时候不同的业务场景会出现不同的问题,例如:

    • 允不允许消息重复
    • 允不允许消息延迟
    • 允不允许消息的丢失

    不同的场景下,光是知道API的使用肯定是满足不了的,因此在使用API之前还需了解发送消息的原理。

    发送消息原理图如下:

    image-20210513100249431

    关于上述原理图解释如下:

    • 生产者源源不断生产消息,一般消息为key-value形式,当然也可以不指定key

    • 消息需要不能直接发送给broker,而是需要经过序列化成为一段字节序列才可以传输

    • 序列完成需要经过分区器,分区器会根据分区分配策略去决定这个消息发往哪个分区

      分区策略分为两种情况,消息有key值和消息没有key值

      • 消息有key值时,会根据key值得hash值然后对分区数进行取模决定消息发送给哪个partition
      • 消息没有key值,会随机发送给某个分区(不同的版本,策略不一样,有的时轮询,有的随机,有的则是一段时间内只发送给某个分区,隔了一段时间发送给另外一个分区)
    • 分区数确定以后真正发送具体的broker上,brokerleader会把消息写入文件中

      写入成功则发送元数据给生产者,如果失败则根据配置是否重试机制进行重试

    1.2 topic

    当消息发送到topic时,其实消息是发送到topicpartition上,而在物理上一个partition就是对应的就是一个目录

    例如:在kafka-eagle上创建wangzhtopic,且分区数为3,副本数为 1,如下:

    image-20210513102003513

    查看该topic详情可知,三个分区其中131上的partition-0leader,其他的如下:

    image-20210513102046056

    同时取查看131机器上的数据/var/data/kafka(这个目录是当时安装时指定的数据存储目录)

    image-20210513102509413

    所以一个分区在物理上对应的就是一个目录

    1.3 存储

    当发送消息时到topicpartitions上,分区会消息写入segment文件上,一个partitions由多个segment文件组成,如下:

    image-20210513110017402

    每个segment文件默认存储数据大小为1G,当然也可以通过修改kafka参数调整

    # 单个segment存储数据大小
    log.segment.bytes=具体内容
    
    # 当超过一定的时间(默认七天),写入segment文件的数据还没有达到1G(默认大小)
    # 也会重新创建新的segment文件
    log.segment.ms=时间
    

    从上图中看出,第一个segment文件的偏移量一定是从0开始的,而下一个segment文件则是从上个segment文件偏移量开始的


    同时segment文件分为.index.log文件,如下:

    image-20210513110632406

    其中.log用来存储真正的数据,.index是索引文件

    假如如果想要消费偏移量为197的文件,如果没有索引则需要从头到位去寻找,而有了索引文件就完全可以提高查询速度

    其中前面一大串代表文件名,第一个segment文件肯定是从0开始,第二个segment文件命名则是以上个文件偏移量+1命名,如下:

    第一个segment文件命名

    0000000000000000.index

    0000000000000000.log

    当上一个文件偏移量为1679898是,那么下个segment文件命名为

    00000000001679899.index

    00000000001679899.log

    以此类推

    1.4 发送

    经过上面的消息,已经知道生产者发送原理,接下来就借助SpringBootbroker发送消息。如下:

    1.4.1 创建

    先创建springboot项目kafka-springboot-test,并且导入kafka依赖,其pom.xml内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.10.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>demo</name>
        <description>Demo project for Spring Boot</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <excludes>
                            <exclude>
                                <groupId>org.projectlombok</groupId>
                                <artifactId>lombok</artifactId>
                            </exclude>
                        </excludes>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.yml增加以下配置

    spring:
      kafka:
       # kafka集群地址
        bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
        listener:
        # 如果没有至少一个配置的主题,则容器是否应无法启动
        # false 代表关闭此功能
          missing-topics-fatal: false
        producer:
        # 发布消息时,key的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
        # 生产者通过该序列化器将消息的key值序列化为字节数组
        # 后面会讲述如何自定义序列化器
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
        
        # 发布消息时,value的序列化器,这里是kafka提供的序列化器
        # 当发送消息的key不是字符串时,需要自己写自定义序列化器
        # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
        # 生产者通过该序列化器将消息的key值序列化为字节数组
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    1.4.2 发布

    当配置完成即可发布消息,发消息先创建topic,上文中已经创建了test_topic这里就不再创建了

    发布消息则是借助org.springframework.kafka.core.KafkaTemplate发布消息,直接注入即可,代码如下:

    package com.example.demo;
    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootTest
    @Slf4j
    class DemoApplicationTests {
    
        /**
         * 第一个泛型为 key值的数据类型
         * 第二个泛型为 value值的数据类型
         */
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
           @Test
        void contextLoads() throws Exception {
            ListenableFuture<SendResult<String, String>> resultListenableFuture =
                    kafkaTemplate.send("wangzh", "test-key", "test-topic");
            
            log.info("元数据信息:" + resultListenableFuture.get());
    
            log.info("发送消息完毕");
    
           // 关闭连接
           kafkaTemplate.destroy();
        }
    
    }
    

    执行代码成功后,就在kafka-eagle看到消息发送结果,如下

    image-20210513113951216

    当然也可以指定partition发送消息

    1.4.3 acks

    acks 参数 规则定了必须须要有多少分区副本收到消息,生产者才会认为消息写入是成功的 这个参数对消息丢失 可能性有重要影响,目前该参数配置如下:

    acks = 0

    生产者在成功写入消息之前不会等待任何来自服务器的响应

    意味着生产者不知道消息有没有把消息发送到broker,只要生产者将消息添加到Socket缓冲区就认为消息发送成功,不需要等待服务器 的响应。因此这种方式也可以支持很高的吞吐量

    acks=1

    只要集群的leader节点收到消息并写入到segment文件,生产者就会收到来自服务器的成功响应,视为发送成功

    假如leader数据写入成功,然后宕机,此时所有的副本还没来的及同步数据,那么

    刚写入的数据就会丢失

    acks=all

    集群的leader收到消息并写入到segment中,同时等待所有的副本同步消息成功后才认为消息发送成功

    这种模式是最安全的,及时有的leader发生奔溃,那还是可以重新选举leader进行通信

    在配置文件的producer里面设置acks即可

    image-20210513133303905

    2.消费者

    2.1 架构

    消费者如果订阅了某个主题消息,那么就可以去进行消费,同时一个消费者属于一个消费组,一个消费组里面所有的消费者都订阅同一个主题.如下:

    image-20210513141948856

    当消费组里面只有一个消费者时,那么这个消费就回去消费所有分区的消息,当然一般开发也就足够了。

    但有时候生产者生产消息过快,而消费者消费消息过慢,就会很容易导致消息堆积,从而阻塞,那么就可以在消费者组里面多增加几个消费者,如下:

    image-20210513143001268

    注意:同一组的消费者是不会消费同一主题的同一分区消息

    当然如果消费者的数量超过了分区数,那么超过的消费者就会处于空闲状态

    image-20210513143347884

    因此不要让消费者的数量超过分区数

    一个消息只能被一个组消费一次,例如上图中consumer-1消费了消息A,那么其他的消费者就不能够再次消费A

    如果在消费时,手动指定了偏移量,那么就会重复消费消息,这种情况特殊

    当然同一个消息可以被多个消费组进行消费,如下图所示:

    image-20210513145040997

    2.2 分配

    如下图,消费组中可以增加,当增加一个消费者,就会分摊之前消费者的消费压力,那么当新增一个消费者是如何将分区分配给消费者的呢

    image-20210513143001268

    当消费者新增一个消费者时,会提高消费者的高可用和伸缩性,且当加入到消费组之后就会

    给新增的消费者分配一个partition,这种操作称为再分配

    注意:在再分配期间,消费者会暂停消费消息,直到分配分区完成才会继续消费消息

    且当分区分配给再次分配给某个消费者时,消费者的消息可能丢失读取状态

    同理当consumer-2消费者退出消费者组时,那么partition-2就会分配到consumer-1,让他去进行消费

    那么kafka是如何知道消费组里面需要再分配呢?这主要是借助于组协调器,每个消费组都会由属于自己的组协调器。

    每隔消费者都会发送心跳到协调器,用来维护群组关系和分区关系,如下图所示:

    image-20210513151828938

    这样kafka就知道了每个消费者属于哪个消费组,以及如何去分配partition

    协调器就类似于spring cloud里面的注册中心

    当消费者因为某些因素突然停止消费,也就是说协调器收不到消费者的心跳,那么协调器会等待几秒,几秒期间还是没有收到心跳,那么协调器就会把该消费者剔除出组,然后实现再分配。

    2.3 消费

    这里同样借助SpringBoot去消费消息,消费者配置如下:

    spring:
      kafka:
        # kafka集群地址
        bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
        listener:
          # 如果没有至少一个配置的主题,则容器是否应无法启动
          # false 代表关闭此功能
          missing-topics-fatal: false
          producer:
            # 发布消息时,key的序列化器,这里是kafka提供的序列化器
            # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
            # 生产者通过该序列化器将消息的key值序列化为字节数组
            # 后面会讲述如何自定义序列化器
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
    
            # 发布消息时,value的序列化器,这里是kafka提供的序列化器
            # 当发送消息的key不是字符串时,需要自己写自定义序列化器
            # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
            # 生产者通过该序列化器将消息的key值序列化为字节数组
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          # 消费者组id
          group-id: wangzh-group
    
          # 是否允许自动提交offset
          # 每当消费者消费一个消息就会产生一个偏移量
          # 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
          enable-auto-commit: true
    
          # 提交偏移量间隔时间数 100ms提交一次
          auto-commit-interval: 100
    
          # 消费消息时的反序列器
          # 消费消息时会将字节序列反序列化为字符串
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
          # 消费消息时的反序列化器
          # 消费消息时会将字节序反序列化为字符串
          # 如果消息不是字符串时,需要自己写反序列话器
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
          # 偏移量配置
          # latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
          # none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
          # 量那么抛出异常
          # earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
          # 从头开始消费
          auto-offset-reset: latest
    

    消费消,利用org.springframework.kafka.annotation.KafkaListener注解即可消费消息,如下:

    package com.example.demo;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class Consumer {
    
        /**
         * topics 为 topic名字,可以填写多个topic的名字
         * ConsumerRecord 为 消息记录,包含了一条消息大部分数据
         */
        @KafkaListener(topics = {"wangzh"})
        public void consumer(ConsumerRecord<String,String> record) {
          log.info("消息key:" + record.key());
          log.info("消息value:" + record.value());
          log.info("消息偏移量:" + record.offset());
          log.info("消息topic" + record.topic());
        }
    }
    

    启动项目即可看到消费的消息,如下:

    image-20210513154435765

    2.4 批量

    上次消费消息时一条一条消费,也就是当一条消息消费完成,才会去消费下一条,这肯定不大合理,因此在数据量大的情况下需要去进行批量消费

    批量消费设置如下:

    spring:
      kafka:
        # kafka集群地址
        bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
        listener:
          # 如果没有至少一个配置的主题,则容器是否应无法启动
          # false 代表关闭此功能
          missing-topics-fatal: false
          producer:
            # 发布消息时,key的序列化器,这里是kafka提供的序列化器
            # 当发送消息的key值不是字符串时,需要自己写自定义序列化器
            # 生产者通过该序列化器将消息的key值序列化为字节数组
            # 后面会讲述如何自定义序列化器
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
    
            # 发布消息时,value的序列化器,这里是kafka提供的序列化器
            # 当发送消息的key不是字符串时,需要自己写自定义序列化器
            # 一般来说发布消息大多数都不是字符串,因此还是需要发送消息
            # 生产者通过该序列化器将消息的key值序列化为字节数组
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 设置为批量消费,默认为单条消费
          type: batch
        consumer:
          # 消费者组id
          group-id: wangzh-group
    
          # 是否允许自动提交offset
          # 每当消费者消费一个消息就会产生一个偏移量
          # 偏移量是消费者提交到kafka中,保存在`__consumer_offsets` topic中
          enable-auto-commit: true
    
          # 提交偏移量间隔时间数 100ms提交一次
          auto-commit-interval: 100
    
          # 消费消息时的反序列器
          # 消费消息时会将字节序列反序列化为字符串
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
          # 消费消息时的反序列化器
          # 消费消息时会将字节序反序列化为字符串
          # 如果消息不是字符串时,需要自己写反序列话器
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
          # 偏移量配置
          # latest 当各个分区有已提交的偏移量是,就从提交的偏移量后开始消费,如果没有则消费该分区最新产生的数据
          # none 各个分区都提交了偏移量后,才从偏移量后开始消费,只要存在一个分区没有提交偏移
          # 量那么抛出异常
          # earlist 当各个分区有已提交的偏移量时,则从提交的偏移量开始消费,如果没有偏移量则
          # 从头开始消费
          auto-offset-reset: latest
    
          # 批量消费时,最多一次消费多少条数据
          max-poll-records: 1000
    
    
    image-20210513155124595 image-20210513155139439

    同时还需要修改接受消息的参数,修改如下:

    @KafkaListener(topics = {"wangzh"})
    public void consumer(List<ConsumerRecord<String,String>> records) {
        records.forEach(record -> {
            log.info("消息key:" + record.key());
            log.info("消息value:" + record.value());
            log.info("消息偏移量:" + record.offset());
            log.info("消息topic" + record.topic());
        });
    }
    
    image-20210513155347649

    2.5 指定

    通过之前的学习知道,消费者每消费一条消息就会提交一次偏移量,下次消费时从偏移量后面开始消费,这样保证消息不会重复消费。

    有时候有一种特殊情况,需要指定偏移量去进行消费,那么之前普通消费并不能满足,因此需要自定义操作


    package com.example.demo;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.annotation.PartitionOffset;
    import org.springframework.kafka.annotation.TopicPartition;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    @Component
    @Slf4j
    public class Consumer {
    
        /**
         * 每次消费 {"0","1","2"} 消息偏移量从1开始消费
         * @param records
         */
        @KafkaListener(topicPartitions = {
                @TopicPartition(topic = "wangzh",partitions = {"0","1","2"},partitionOffsets = @PartitionOffset(initialOffset =  "1",partition = "*"))
        })
        public void consumer(List<ConsumerRecord<String,String>> records) {
            records.forEach(record -> {
                log.info("消息key:" + record.key());
                log.info("消息value:" + record.value());
                log.info("消息偏移量:" + record.offset());
                log.info("消息topic" + record.topic());
            });
        }
    }
    

    相关文章

      网友评论

          本文标题:软件篇-kafka(over)-研究-深入研究生产消费模型

          本文链接:https://www.haomeiwen.com/subject/xqixjltx.html