美文网首页
springboot2集成kafka注意点

springboot2集成kafka注意点

作者: virtual灬zzZ | 来源:发表于2021-12-04 00:40 被阅读0次

    自动创建topic

    调用kafkaTemplate的send,发去的topic如果没有,会自动创建,默认是一个partition、一个replicas的。

    Topic: llc2021  PartitionCount: 1   ReplicationFactor: 1    Configs: 
        Topic: llc2021  Partition: 0    Leader: 3   Replicas: 3 Isr: 3
    
    consumer端的auto-offset-reset

    earliest:如果topic中本来就含有消息,即有offset,启动消费者它会从头去消费。

    latest:如果topic中本来就含有消息,即有offset,但启动消费者它并不会去消费,它只消费最新的。

    @KafkaListener详解
    id 监听器的id

    ①. 消费者线程命名规则
    有:Thread[haha-2-C-1,5,main],绝顶线程名前缀名字
    无:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1,5,main]

    ②.在相同容器中的监听器ID不能重复
    否则会报错

    Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
    

    ③.会覆盖消费者工厂的消费组GroupId
    假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
    正常情况它是该容器中的默认消费组
    但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
    那么当前消费者的消费组就是consumer-id7 ;

    当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

    ④. 如果配置了属性groupId,则其优先级最高
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test"),最终这个消费者的消费组GroupId是 “groupId-test”

    groupId 消费组名

    指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id。

    topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

    可以同时监听多个
    topics = {"SHI_TOPIC3","SHI_TOPIC4"}

    topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)
    topicPartitions 显式分区分配
    可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

    @KafkaListener(id = "thing2", topicPartitions =
            { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
              @TopicPartition(topic = "topic2", partitions = "0",
                 partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            })
    public void listen(ConsumerRecord<?, ?> record) {
        ...
    }
    

    上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

    errorHandler 异常处理

    实现KafkaListenerErrorHandler; 然后做一些异常处理;

    @Component
    public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
            return null;
        }
    
        @Override
        public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
            //do someting
            return null;
        }
    }
    

    调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

    containerFactory 监听器工厂

    指定生成监听器的工厂类;

    批量消费的工厂类

        /**
         * 监听器工厂 批量消费
         * @return
         */
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(kafkaConsumerFactory());
            //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
            factory.setBatchListener(true);
            return factory;
        }
    

    使用containerFactory = "batchFactory"

    clientIdPrefix 客户端前缀

    会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字,如果没有,就用consumer-GROUPD_ID-数字

    有:[Consumer clientId=orion-2, groupId=sadan] Adding newly assigned partitions: test-3

    无 [Consumer clientId=consumer-sadan_fake-3, groupId=sadan_fake] Adding newly assigned partitions: test-3

    concurrency并发数

    会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端,即3个消费者,来分配消费分区;

        /**
         * 监听器工厂 
         * @return
         */
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(kafkaConsumerFactory());
            factory.setConcurrency(6);
            return factory;
        }
    
        @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
    

    虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

    • 这里注意一点:

    项目中总的消费者线程数量为: concurrency * @KafkaListener的数量(默认监听全部的partition)

    1. 当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
    2. 当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
    3. 当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
    properties 配置其他属性

    kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
    同名的都可以修改掉;

    用法

        @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
                , clientIdPrefix = "myClientId5",groupId = "groupId-test",
                properties = {
                        "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
    

    关于concurrency的配置

    单机环境:

    针对监听单个topic,concurrency一般和partition的数目一致即可,即有多少个partition对应多少个消费者线程,最大程度高效,如果不等就按照合适的分区策略(RoundRobinAssignor 和 RangeAssignor)去给个别消费者多一些partition来消费,负担重一点,如果线程过多,会浪费资源,有的消费者没有分区消费。

    监听一个topic(llc2021)有4个分区,3个消费者
    @KafkaListener(id = "haha",groupId = "sadan",topics = "llc2021",clientIdPrefix = "orion",concurrency = "3")
     
    orion-0-6337a10a-5d00-47c2-8c47-c2f79fcd2d9d=Assignment(partitions=[llc2021-0, llc2021-1]),
    orion-1-dbc3ad53-deac-47cb-aa18-587f4bf1d884=Assignment(partitions=[llc2021-2]), 
    orion-2-66fb68a5-4fda-4a9c-8201-20eaf5b617d6=Assignment(partitions=[llc2021-3])
    
    
    监听2个topic,一个(llc2021)4个分区,一个(llc2022)3个分区,5个消费者
    @KafkaListener(groupId = "sadan_fake",topics = {"llc2021","llc2022"},concurrency = "5")
    
    consumer-sadan_fake-5-1b672bd3-e75f-43ee-9828-b60ef7a006cf=Assignment(partitions=[]), 
    consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969=Assignment(partitions=[llc2021-1, llc2022-1]),
    consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890=Assignment(partitions=[llc2021-2, llc2022-2]),
    consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036=Assignment(partitions=[llc2021-3]), 
    consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068=Assignment(partitions=[llc2021-0, llc2022-0])}
    
    查看消费者情况
    /usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
    sadan           llc2021         0          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
    sadan           llc2021         1          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
    sadan           llc2021         2          1               1               0               orion-1-7867333b-dd71-49e9-97af-054d93b2a183 /192.168.1.106  orion-1
    sadan           llc2021         3          4               4               0               orion-2-fc7be1ef-f620-4e17-b48c-96c85986e5ee /192.168.1.106  orion-2
    
    
    /usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan_fake --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                HOST            CLIENT-ID
    sadan_fake      llc2021         1          0               0               0               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
    sadan_fake      llc2022         1          -               0               -               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
    sadan_fake      llc2022         0          -               0               -               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
    sadan_fake      llc2021         0          0               0               0               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
    sadan_fake      llc2022         2          -               0               -               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
    sadan_fake      llc2021         2          1               1               0               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
    sadan_fake      llc2021         3          4               4               0               consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036 /192.168.1.106  consumer-sadan_fake-4
    
    
    集群环境(4个分区,每个node的concurrency是3)

    多个节点监听同一个topic,节点先后启动就会开启进行重平衡了,如下:
    A节点启动,可见分区按照策略已经全部分配给该group的各个消费者

    orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[llc2021-3]), 
    orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[llc2021-2]), 
    orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-0, llc2021-1])
    
    /usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
    sadan           llc2021         0          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
    sadan           llc2021         1          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
    sadan           llc2021         3          5               5               0               orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3 /192.168.1.106  orion-2
    sadan           llc2021         2          2               2               0               orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05 /192.168.1.106  orion-1
    
    

    B节点启动后,应为消费者增多了,发生了重平衡:

    INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
    INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Revoke previously assigned partitions llc2021-2
    INFO 24168 --- [     haha-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-2]
    INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] (Re-)joining group
    INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
    INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Revoke previously assigned partitions llc2021-1, llc2021-0
    INFO 24168 --- [     haha-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-1, llc2021-0]
    INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] (Re-)joining group
    INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
    INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Revoke previously assigned partitions llc2021-3
    INFO 24168 --- [     haha-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-3]
    INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] (Re-)joining group
    INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Finished assignment for group at generation 60: {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]), orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]), consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]), orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]), consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
    
    //注意finished 分区
    {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]),
    orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), 
    consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]),
    consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]),
    orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]),
    consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
    
    /usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    sadan           llc2021         1          0               0               0               consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377 /192.168.1.106  consumer-sadan-2
    sadan           llc2021         2          2               2               0               consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e /192.168.1.106  consumer-sadan-3
    sadan           llc2021         3          5               5               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a          /192.168.1.106  orion-0
    sadan           llc2021         0          0               0               0               consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b /192.168.1.106  consumer-sadan-1
    
    

    因为2个节点一共6个消费者,有2个消费者是没有分配到partition去消费的,浪费资源,所以还是要按照分区策略构想好,再设置正确的消费者线程数,合理的消费者数应该是 partitions/nodeNum=concurrency

    各个node都设置concurrency=2,不浪费消费者,都拥有一个partition
    
    {orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116=Assignment(partitions=[llc2021-2]),
    consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1=Assignment(partitions=[llc2021-1]),
    orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a=Assignment(partitions=[llc2021-3]), 
    consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba=Assignment(partitions=[llc2021-0])}
    
    /usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    sadan           llc2021         3          5               5               0               orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a          /192.168.1.106  orion-1
    sadan           llc2021         2          2               2               0               orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116          /192.168.1.106  orion-0
    sadan           llc2021         0          0               0               0               consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba /192.168.1.106  consumer-sadan-1
    sadan           llc2021         1          0               0               0               consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1 /192.168.1.106  consumer-sadan-2
    
    

    接收消息发生异常

    container设置了全局异常handler,可以独立创建各自的errorHandler设置并在@KafkaListener中配置。如果handler中抛出了exception,不会提交offset(offset前提已设置手动),用命令可以查看hw落后于leo的。

    这里是1个topic,4个分区,2个节点,其中1个故意设置异常

    注意consumerID
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
    sadan           llc2021         3          795             801             6               orion-1-1a4f3ef9-6716-4829-ab43-70adbaf3713e          /192.168.1.106  orion-1
    sadan           llc2021         2          790             791             1               orion-0-d6d8f3cf-29fd-4c1a-99d4-d34b617cf434          /192.168.1.106  orion-0
    sadan           llc2021         1          570             570             0               consumer-sadan-2-b1113c48-9f8c-462a-8bfa-f3a474283c57 /192.168.1.106  consumer-sadan-2
    sadan           llc2021         0          1089            1089            0               consumer-sadan-1-d98a21da-b9d9-43e6-ab1e-13347ba241e2 /192.168.1.106  consumer-sadan-1
    

    关闭故意设置异常的节点,会发生重平衡,所有分区全都给了另外一个节点,于是它会去消费落后的message,如果关闭了正常的节点,当然也是触发重平衡,设置异常的节点会再触发一次消费落后的message,只是又一次报错,而消费不下去而已。

    注意consumerID
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
    sadan           llc2021         1          572             572             0               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
    sadan           llc2021         3          801             807             6               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
    sadan           llc2021         0          1090            1090            0               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0
    sadan           llc2021         2          791             792             1               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0
    

    参考:
    @KafkaListener详解与使用

    spring.kafka.listener.concurrency

    spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果

    @KafkaListener详解

    kafka的一些参数说明

    相关文章

      网友评论

          本文标题:springboot2集成kafka注意点

          本文链接:https://www.haomeiwen.com/subject/gwtuxrtx.html