美文网首页
springboot2集成kafka注意点

springboot2集成kafka注意点

作者: virtual灬zzZ | 来源:发表于2021-12-04 00:40 被阅读0次

自动创建topic

调用kafkaTemplate的send,发去的topic如果没有,会自动创建,默认是一个partition、一个replicas的。

Topic: llc2021  PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: llc2021  Partition: 0    Leader: 3   Replicas: 3 Isr: 3
consumer端的auto-offset-reset

earliest:如果topic中本来就含有消息,即有offset,启动消费者它会从头去消费。

latest:如果topic中本来就含有消息,即有offset,但启动消费者它并不会去消费,它只消费最新的。

@KafkaListener详解
id 监听器的id

①. 消费者线程命名规则
有:Thread[haha-2-C-1,5,main],绝顶线程名前缀名字
无:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1,5,main]

②.在相同容器中的监听器ID不能重复
否则会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.会覆盖消费者工厂的消费组GroupId
假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test"),最终这个消费者的消费组GroupId是 “groupId-test”

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id。

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions 显式分区分配
可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        //do someting
        return null;
    }
}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

批量消费的工厂类

    /**
     * 监听器工厂 批量消费
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字,如果没有,就用consumer-GROUPD_ID-数字

有:[Consumer clientId=orion-2, groupId=sadan] Adding newly assigned partitions: test-3

无 [Consumer clientId=consumer-sadan_fake-3, groupId=sadan_fake] Adding newly assigned partitions: test-3

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端,即3个消费者,来分配消费分区;

    /**
     * 监听器工厂 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

  • 这里注意一点:

项目中总的消费者线程数量为: concurrency * @KafkaListener的数量(默认监听全部的partition)

  1. 当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
  2. 当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
  3. 当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

关于concurrency的配置

单机环境:

针对监听单个topic,concurrency一般和partition的数目一致即可,即有多少个partition对应多少个消费者线程,最大程度高效,如果不等就按照合适的分区策略(RoundRobinAssignor 和 RangeAssignor)去给个别消费者多一些partition来消费,负担重一点,如果线程过多,会浪费资源,有的消费者没有分区消费。

监听一个topic(llc2021)有4个分区,3个消费者
@KafkaListener(id = "haha",groupId = "sadan",topics = "llc2021",clientIdPrefix = "orion",concurrency = "3")
 
orion-0-6337a10a-5d00-47c2-8c47-c2f79fcd2d9d=Assignment(partitions=[llc2021-0, llc2021-1]),
orion-1-dbc3ad53-deac-47cb-aa18-587f4bf1d884=Assignment(partitions=[llc2021-2]), 
orion-2-66fb68a5-4fda-4a9c-8201-20eaf5b617d6=Assignment(partitions=[llc2021-3])

监听2个topic,一个(llc2021)4个分区,一个(llc2022)3个分区,5个消费者
@KafkaListener(groupId = "sadan_fake",topics = {"llc2021","llc2022"},concurrency = "5")

consumer-sadan_fake-5-1b672bd3-e75f-43ee-9828-b60ef7a006cf=Assignment(partitions=[]), 
consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969=Assignment(partitions=[llc2021-1, llc2022-1]),
consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890=Assignment(partitions=[llc2021-2, llc2022-2]),
consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036=Assignment(partitions=[llc2021-3]), 
consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068=Assignment(partitions=[llc2021-0, llc2022-0])}

查看消费者情况
/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         0          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
sadan           llc2021         1          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
sadan           llc2021         2          1               1               0               orion-1-7867333b-dd71-49e9-97af-054d93b2a183 /192.168.1.106  orion-1
sadan           llc2021         3          4               4               0               orion-2-fc7be1ef-f620-4e17-b48c-96c85986e5ee /192.168.1.106  orion-2


/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan_fake --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                HOST            CLIENT-ID
sadan_fake      llc2021         1          0               0               0               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
sadan_fake      llc2022         1          -               0               -               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
sadan_fake      llc2022         0          -               0               -               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
sadan_fake      llc2021         0          0               0               0               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
sadan_fake      llc2022         2          -               0               -               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
sadan_fake      llc2021         2          1               1               0               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
sadan_fake      llc2021         3          4               4               0               consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036 /192.168.1.106  consumer-sadan_fake-4

集群环境(4个分区,每个node的concurrency是3)

多个节点监听同一个topic,节点先后启动就会开启进行重平衡了,如下:
A节点启动,可见分区按照策略已经全部分配给该group的各个消费者

orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[llc2021-3]), 
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[llc2021-2]), 
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-0, llc2021-1])

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         0          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
sadan           llc2021         1          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
sadan           llc2021         3          5               5               0               orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3 /192.168.1.106  orion-2
sadan           llc2021         2          2               2               0               orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05 /192.168.1.106  orion-1

B节点启动后,应为消费者增多了,发生了重平衡:

INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Revoke previously assigned partitions llc2021-2
INFO 24168 --- [     haha-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-2]
INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Revoke previously assigned partitions llc2021-1, llc2021-0
INFO 24168 --- [     haha-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-1, llc2021-0]
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Revoke previously assigned partitions llc2021-3
INFO 24168 --- [     haha-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-3]
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Finished assignment for group at generation 60: {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]), orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]), consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]), orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]), consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}

//注意finished 分区
{orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), 
consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         1          0               0               0               consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377 /192.168.1.106  consumer-sadan-2
sadan           llc2021         2          2               2               0               consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e /192.168.1.106  consumer-sadan-3
sadan           llc2021         3          5               5               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a          /192.168.1.106  orion-0
sadan           llc2021         0          0               0               0               consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b /192.168.1.106  consumer-sadan-1

因为2个节点一共6个消费者,有2个消费者是没有分配到partition去消费的,浪费资源,所以还是要按照分区策略构想好,再设置正确的消费者线程数,合理的消费者数应该是 partitions/nodeNum=concurrency

各个node都设置concurrency=2,不浪费消费者,都拥有一个partition

{orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1=Assignment(partitions=[llc2021-1]),
orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a=Assignment(partitions=[llc2021-3]), 
consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba=Assignment(partitions=[llc2021-0])}

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         3          5               5               0               orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a          /192.168.1.106  orion-1
sadan           llc2021         2          2               2               0               orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116          /192.168.1.106  orion-0
sadan           llc2021         0          0               0               0               consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba /192.168.1.106  consumer-sadan-1
sadan           llc2021         1          0               0               0               consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1 /192.168.1.106  consumer-sadan-2

接收消息发生异常

container设置了全局异常handler,可以独立创建各自的errorHandler设置并在@KafkaListener中配置。如果handler中抛出了exception,不会提交offset(offset前提已设置手动),用命令可以查看hw落后于leo的。

这里是1个topic,4个分区,2个节点,其中1个故意设置异常

注意consumerID
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         3          795             801             6               orion-1-1a4f3ef9-6716-4829-ab43-70adbaf3713e          /192.168.1.106  orion-1
sadan           llc2021         2          790             791             1               orion-0-d6d8f3cf-29fd-4c1a-99d4-d34b617cf434          /192.168.1.106  orion-0
sadan           llc2021         1          570             570             0               consumer-sadan-2-b1113c48-9f8c-462a-8bfa-f3a474283c57 /192.168.1.106  consumer-sadan-2
sadan           llc2021         0          1089            1089            0               consumer-sadan-1-d98a21da-b9d9-43e6-ab1e-13347ba241e2 /192.168.1.106  consumer-sadan-1

关闭故意设置异常的节点,会发生重平衡,所有分区全都给了另外一个节点,于是它会去消费落后的message,如果关闭了正常的节点,当然也是触发重平衡,设置异常的节点会再触发一次消费落后的message,只是又一次报错,而消费不下去而已。

注意consumerID
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         1          572             572             0               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
sadan           llc2021         3          801             807             6               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
sadan           llc2021         0          1090            1090            0               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0
sadan           llc2021         2          791             792             1               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0

参考:
@KafkaListener详解与使用

spring.kafka.listener.concurrency

spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果

@KafkaListener详解

kafka的一些参数说明

相关文章

网友评论

      本文标题:springboot2集成kafka注意点

      本文链接:https://www.haomeiwen.com/subject/gwtuxrtx.html