在 [二、Spring Cloud Stream整合Kafka](https://www.jianshu.com/p/eed59e87e45a)
的基础上再创建一个module , kafka-consumer2 (创建过程可参考https://www.jianshu.com/p/d7771682688b)
子级 生产者(kafka-producer) application.yml 更新后如下:
server:
port: 8181
spring:
application:
name: kafka_producer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
auto-add-partitions: true # 当partition-count设置的值超过原来设置的值,true=自动创建分区
bindings:
stream-demo: #这里可以任意写,消费者应与之一致
destination: custom-message-topic #这里可以任意写,消费者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
producer:
# 分区的数量(默认为1)
partition-count: 2
主要目的是设置分区的值
auto-add-partitions: true
producer:
# 分区的数量(默认为1)
partition-count: 2
子级 消费者(kafka-consumer) application.yml 更新后如下:
server:
port: 8081
kafka:
group: kafka_g1
spring:
application:
name: kafka_consumer
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo: #这里可以任意写,生产者应与之一致
destination: custom-message-topic #这里可以任意写,生产者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
group: ${kafka.group}
主要目的是分组
kafka:
group: kafka_g1
group: ${kafka.group}
子级 消费者(kafka-consumer2) application.yml 同kafka-consumer基本一致
server:
port: 8082
kafka:
group: kafka_g1
spring:
application:
name: kafka_consumer2
cloud:
stream:
kafka:
binder:
brokers: localhost:9092 #Kafka的消息中间件服务器
zk-nodes: localhost:2181 #Zookeeper的节点,如果集群,后面加,号分隔
auto-create-topics: true #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
bindings:
stream-demo: #这里可以任意写,生产者应与之一致
destination: custom-message-topic #这里可以任意写,生产者应与之一致,消息发往的目的地
content-type: application/json #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
group: ${kafka.group}
测试 http://localhost:8181/produce
消费者kafka-consumer 打印如下:
2022-04-07 15:40:56.561 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息26
2022-04-07 15:40:56.562 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息27
2022-04-07 15:40:56.564 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息28
2022-04-07 15:40:56.565 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息29
2022-04-07 15:40:56.565 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息30
2022-04-07 15:40:56.566 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息31
2022-04-07 15:40:56.566 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息32
2022-04-07 15:40:56.580 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息46
2022-04-07 15:40:56.580 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息47
2022-04-07 15:40:56.581 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息49
2022-04-07 15:40:56.581 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息55
2022-04-07 15:40:56.581 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息56
2022-04-07 15:40:56.581 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息59
2022-04-07 15:40:56.582 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息60
2022-04-07 15:40:56.582 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息64
2022-04-07 15:40:56.583 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息68
2022-04-07 15:40:56.583 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息74
2022-04-07 15:40:56.583 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息76
2022-04-07 15:40:56.584 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息77
2022-04-07 15:40:56.587 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息84
2022-04-07 15:40:56.588 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息86
2022-04-07 15:40:56.588 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息87
2022-04-07 15:40:56.589 INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData : 接收消息: 消息93
消费者kafka-consumer2 打印如下:
2022-04-07 15:40:56.599 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息0
2022-04-07 15:40:56.600 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息1
2022-04-07 15:40:56.600 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息2
2022-04-07 15:40:56.600 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息3
2022-04-07 15:40:56.601 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息4
2022-04-07 15:40:56.601 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息5
2022-04-07 15:40:56.601 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息6
2022-04-07 15:40:56.602 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息7
2022-04-07 15:40:56.602 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息8
2022-04-07 15:40:56.602 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息9
2022-04-07 15:40:56.602 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息10
2022-04-07 15:40:56.603 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息11
2022-04-07 15:40:56.603 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息12
2022-04-07 15:40:56.603 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息13
2022-04-07 15:40:56.603 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息14
2022-04-07 15:40:56.604 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息15
2022-04-07 15:40:56.604 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息16
2022-04-07 15:40:56.605 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息17
2022-04-07 15:40:56.605 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息18
2022-04-07 15:40:56.605 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息19
2022-04-07 15:40:56.606 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息20
2022-04-07 15:40:56.606 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息21
2022-04-07 15:40:56.606 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息22
2022-04-07 15:40:56.607 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息23
2022-04-07 15:40:56.607 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息24
2022-04-07 15:40:56.607 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息25
2022-04-07 15:40:56.627 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息33
2022-04-07 15:40:56.628 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息34
2022-04-07 15:40:56.628 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息35
2022-04-07 15:40:56.629 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息36
2022-04-07 15:40:56.629 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息37
2022-04-07 15:40:56.629 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息38
2022-04-07 15:40:56.629 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息39
2022-04-07 15:40:56.630 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息40
2022-04-07 15:40:56.630 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息41
2022-04-07 15:40:56.630 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息42
2022-04-07 15:40:56.631 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息48
2022-04-07 15:40:56.632 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息53
2022-04-07 15:40:56.632 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息54
2022-04-07 15:40:56.633 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息57
2022-04-07 15:40:56.633 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息58
2022-04-07 15:40:56.634 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息63
2022-04-07 15:40:56.634 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息65
2022-04-07 15:40:56.635 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息66
2022-04-07 15:40:56.635 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息67
2022-04-07 15:40:56.636 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息70
2022-04-07 15:40:56.636 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息73
2022-04-07 15:40:56.636 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息75
2022-04-07 15:40:56.637 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息78
2022-04-07 15:40:56.637 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息79
2022-04-07 15:40:56.637 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息82
2022-04-07 15:40:56.637 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息43
2022-04-07 15:40:56.638 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息44
2022-04-07 15:40:56.638 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息45
2022-04-07 15:40:56.638 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息50
2022-04-07 15:40:56.639 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息51
2022-04-07 15:40:56.639 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息52
2022-04-07 15:40:56.639 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息61
2022-04-07 15:40:56.640 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息62
2022-04-07 15:40:56.641 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息69
2022-04-07 15:40:56.641 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息71
2022-04-07 15:40:56.642 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息72
2022-04-07 15:40:56.642 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息80
2022-04-07 15:40:56.643 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息81
2022-04-07 15:40:56.656 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息96
2022-04-07 15:40:56.657 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息97
2022-04-07 15:40:56.657 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息98
2022-04-07 15:40:56.658 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息83
2022-04-07 15:40:56.659 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息85
2022-04-07 15:40:56.659 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息88
2022-04-07 15:40:56.660 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息89
2022-04-07 15:40:56.660 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息90
2022-04-07 15:40:56.660 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息91
2022-04-07 15:40:56.661 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息92
2022-04-07 15:40:56.661 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息94
2022-04-07 15:40:56.662 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息95
2022-04-07 15:40:56.662 INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData : 接收消息: 消息99
遇到问题2:
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException:
The number of expected partitions was: 2, but 1 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
解决:
生产者yml文件中 spring.cloud.stream.kafka.binder 后面加入
auto-add-partitions: true
# 当partition-count设置的值超过原来设置的值,true=自动创建分区
# partition-count: 2 这里的值超过原来设置的值,如果不是自动创建分区会抛上面的异常
网友评论