自动创建topic
调用kafkaTemplate的send,发去的topic如果没有,会自动创建,默认是一个partition、一个replicas的。
Topic: llc2021 PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: llc2021 Partition: 0 Leader: 3 Replicas: 3 Isr: 3
consumer端的auto-offset-reset
earliest:如果topic中本来就含有消息,即有offset,启动消费者它会从头去消费。
latest:如果topic中本来就含有消息,即有offset,但启动消费者它并不会去消费,它只消费最新的。
@KafkaListener详解
id 监听器的id
①. 消费者线程命名规则
有:Thread[haha-2-C-1,5,main],绝顶线程名前缀名字
无:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1,5,main]
②.在相同容器中的监听器ID不能重复
否则会报错
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.会覆盖消费者工厂的消费组GroupId
假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7 ;
当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;
④. 如果配置了属性groupId,则其优先级最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test"),最终这个消费者的消费组GroupId是 “groupId-test”
groupId 消费组名
指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id。
topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions 显式分区分配
可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;
errorHandler 异常处理
实现KafkaListenerErrorHandler; 然后做一些异常处理;
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//do someting
return null;
}
}
调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
containerFactory 监听器工厂
指定生成监听器的工厂类;
批量消费的工厂类
/**
* 监听器工厂 批量消费
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
使用containerFactory = "batchFactory"
clientIdPrefix 客户端前缀
会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字,如果没有,就用consumer-GROUPD_ID-数字
有:[Consumer clientId=orion-2, groupId=sadan] Adding newly assigned partitions: test-3
无 [Consumer clientId=consumer-sadan_fake-3, groupId=sadan_fake] Adding newly assigned partitions: test-3
concurrency并发数
会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端,即3个消费者,来分配消费分区;
/**
* 监听器工厂
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(6);
return factory;
}
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;
- 这里注意一点:
项目中总的消费者线程数量为: concurrency * @KafkaListener的数量(默认监听全部的partition)
- 当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
- 当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
- 当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
properties 配置其他属性
kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
, clientIdPrefix = "myClientId5",groupId = "groupId-test",
properties = {
"enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
关于concurrency的配置
单机环境:
针对监听单个topic,concurrency一般和partition的数目一致即可,即有多少个partition对应多少个消费者线程,最大程度高效,如果不等就按照合适的分区策略(RoundRobinAssignor 和 RangeAssignor)去给个别消费者多一些partition来消费,负担重一点,如果线程过多,会浪费资源,有的消费者没有分区消费。
监听一个topic(llc2021)有4个分区,3个消费者
@KafkaListener(id = "haha",groupId = "sadan",topics = "llc2021",clientIdPrefix = "orion",concurrency = "3")
orion-0-6337a10a-5d00-47c2-8c47-c2f79fcd2d9d=Assignment(partitions=[llc2021-0, llc2021-1]),
orion-1-dbc3ad53-deac-47cb-aa18-587f4bf1d884=Assignment(partitions=[llc2021-2]),
orion-2-66fb68a5-4fda-4a9c-8201-20eaf5b617d6=Assignment(partitions=[llc2021-3])
监听2个topic,一个(llc2021)4个分区,一个(llc2022)3个分区,5个消费者
@KafkaListener(groupId = "sadan_fake",topics = {"llc2021","llc2022"},concurrency = "5")
consumer-sadan_fake-5-1b672bd3-e75f-43ee-9828-b60ef7a006cf=Assignment(partitions=[]),
consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969=Assignment(partitions=[llc2021-1, llc2022-1]),
consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890=Assignment(partitions=[llc2021-2, llc2022-2]),
consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036=Assignment(partitions=[llc2021-3]),
consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068=Assignment(partitions=[llc2021-0, llc2022-0])}
查看消费者情况
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 0 0 0 0 orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106 orion-0
sadan llc2021 1 0 0 0 orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106 orion-0
sadan llc2021 2 1 1 0 orion-1-7867333b-dd71-49e9-97af-054d93b2a183 /192.168.1.106 orion-1
sadan llc2021 3 4 4 0 orion-2-fc7be1ef-f620-4e17-b48c-96c85986e5ee /192.168.1.106 orion-2
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan_fake --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan_fake llc2021 1 0 0 0 consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106 consumer-sadan_fake-2
sadan_fake llc2022 1 - 0 - consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106 consumer-sadan_fake-2
sadan_fake llc2022 0 - 0 - consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106 consumer-sadan_fake-1
sadan_fake llc2021 0 0 0 0 consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106 consumer-sadan_fake-1
sadan_fake llc2022 2 - 0 - consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106 consumer-sadan_fake-3
sadan_fake llc2021 2 1 1 0 consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106 consumer-sadan_fake-3
sadan_fake llc2021 3 4 4 0 consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036 /192.168.1.106 consumer-sadan_fake-4
集群环境(4个分区,每个node的concurrency是3)
多个节点监听同一个topic,节点先后启动就会开启进行重平衡了,如下:
A节点启动,可见分区按照策略已经全部分配给该group的各个消费者
orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[llc2021-3]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[llc2021-2]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-0, llc2021-1])
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 0 0 0 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 1 0 0 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 3 5 5 0 orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3 /192.168.1.106 orion-2
sadan llc2021 2 2 2 0 orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05 /192.168.1.106 orion-1
B节点启动后,应为消费者增多了,发生了重平衡:
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-1, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-1, groupId=sadan] Revoke previously assigned partitions llc2021-2
INFO 24168 --- [ haha-1-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-2]
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-1, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-0, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-0, groupId=sadan] Revoke previously assigned partitions llc2021-1, llc2021-0
INFO 24168 --- [ haha-0-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-1, llc2021-0]
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-0, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-2, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-2, groupId=sadan] Revoke previously assigned partitions llc2021-3
INFO 24168 --- [ haha-2-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-3]
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-2, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-2, groupId=sadan] Finished assignment for group at generation 60: {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]), orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]), consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]), orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]), consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
//注意finished 分区
{orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]),
consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 1 0 0 0 consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377 /192.168.1.106 consumer-sadan-2
sadan llc2021 2 2 2 0 consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e /192.168.1.106 consumer-sadan-3
sadan llc2021 3 5 5 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 0 0 0 0 consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b /192.168.1.106 consumer-sadan-1
因为2个节点一共6个消费者,有2个消费者是没有分配到partition去消费的,浪费资源,所以还是要按照分区策略构想好,再设置正确的消费者线程数,合理的消费者数应该是 partitions/nodeNum=concurrency。
各个node都设置concurrency=2,不浪费消费者,都拥有一个partition
{orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1=Assignment(partitions=[llc2021-1]),
orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba=Assignment(partitions=[llc2021-0])}
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 3 5 5 0 orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a /192.168.1.106 orion-1
sadan llc2021 2 2 2 0 orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116 /192.168.1.106 orion-0
sadan llc2021 0 0 0 0 consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba /192.168.1.106 consumer-sadan-1
sadan llc2021 1 0 0 0 consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1 /192.168.1.106 consumer-sadan-2
接收消息发生异常
container设置了全局异常handler,可以独立创建各自的errorHandler设置并在@KafkaListener中配置。如果handler中抛出了exception,不会提交offset(offset前提已设置手动),用命令可以查看hw落后于leo的。
这里是1个topic,4个分区,2个节点,其中1个故意设置异常
注意consumerID
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 3 795 801 6 orion-1-1a4f3ef9-6716-4829-ab43-70adbaf3713e /192.168.1.106 orion-1
sadan llc2021 2 790 791 1 orion-0-d6d8f3cf-29fd-4c1a-99d4-d34b617cf434 /192.168.1.106 orion-0
sadan llc2021 1 570 570 0 consumer-sadan-2-b1113c48-9f8c-462a-8bfa-f3a474283c57 /192.168.1.106 consumer-sadan-2
sadan llc2021 0 1089 1089 0 consumer-sadan-1-d98a21da-b9d9-43e6-ab1e-13347ba241e2 /192.168.1.106 consumer-sadan-1
关闭故意设置异常的节点,会发生重平衡,所有分区全都给了另外一个节点,于是它会去消费落后的message,如果关闭了正常的节点,当然也是触发重平衡,设置异常的节点会再触发一次消费落后的message,只是又一次报错,而消费不下去而已。
注意consumerID
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 1 572 572 0 orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106 orion-1
sadan llc2021 3 801 807 6 orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106 orion-1
sadan llc2021 0 1090 1090 0 orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106 orion-0
sadan llc2021 2 791 792 1 orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106 orion-0
spring.kafka.listener.concurrency
spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果
网友评论