美文网首页
三、Spring Cloud Stream+Kafka重复消费问

三、Spring Cloud Stream+Kafka重复消费问

作者: 一介书生独醉江湖 | 来源:发表于2022-04-07 15:08 被阅读0次
    在 [二、Spring Cloud Stream整合Kafka](https://www.jianshu.com/p/eed59e87e45a)
    的基础上再创建一个module , kafka-consumer2 (创建过程可参考https://www.jianshu.com/p/d7771682688b)
    
    子级 生产者(kafka-producer) application.yml 更新后如下:
    
    server:
      port: 8181
    
    spring:
      application:
        name: kafka_producer
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
              auto-add-partitions: true     # 当partition-count设置的值超过原来设置的值,true=自动创建分区
          bindings:
            stream-demo:                          #这里可以任意写,消费者应与之一致
              destination: custom-message-topic   #这里可以任意写,消费者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
              producer:
                # 分区的数量(默认为1)
                partition-count: 2
    
    主要目的是设置分区的值
    
    auto-add-partitions: true
    
    producer:
      # 分区的数量(默认为1)
       partition-count: 2
    
    子级 消费者(kafka-consumer) application.yml 更新后如下:
    
    server:
      port: 8081
    
    kafka:
      group: kafka_g1
    spring:
      application:
        name: kafka_consumer
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo:                          #这里可以任意写,生产者应与之一致
              destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
              group: ${kafka.group}
    
    
    主要目的是分组
    
    kafka:
      group: kafka_g1
    
    group: ${kafka.group}
    
    子级 消费者(kafka-consumer2) application.yml 同kafka-consumer基本一致
    
    server:
      port: 8082
    
    kafka:
      group: kafka_g1
    spring:
      application:
        name: kafka_consumer2
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo:                          #这里可以任意写,生产者应与之一致
              destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
              group: ${kafka.group}
    
    
    测试  http://localhost:8181/produce
    
    消费者kafka-consumer 打印如下:
    
    2022-04-07 15:40:56.561  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息26 
    2022-04-07 15:40:56.562  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息27 
    2022-04-07 15:40:56.564  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息28 
    2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息29 
    2022-04-07 15:40:56.565  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息30 
    2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息31 
    2022-04-07 15:40:56.566  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息32 
    2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息46 
    2022-04-07 15:40:56.580  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息47 
    2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息49 
    2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息55 
    2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息56 
    2022-04-07 15:40:56.581  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息59 
    2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息60 
    2022-04-07 15:40:56.582  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息64 
    2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息68 
    2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息74 
    2022-04-07 15:40:56.583  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息76 
    2022-04-07 15:40:56.584  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息77 
    2022-04-07 15:40:56.587  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息84 
    2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息86 
    2022-04-07 15:40:56.588  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息87 
    2022-04-07 15:40:56.589  INFO 21711 --- [container-0-C-1] c.e.test.kafkaconsumer.test.ReceiveData  : 接收消息: 消息93 
    
    
    消费者kafka-consumer2 打印如下:
    
    2022-04-07 15:40:56.599  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息0 
    2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息1 
    2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息2 
    2022-04-07 15:40:56.600  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息3 
    2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息4 
    2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息5 
    2022-04-07 15:40:56.601  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息6 
    2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息7 
    2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息8 
    2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息9 
    2022-04-07 15:40:56.602  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息10 
    2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息11 
    2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息12 
    2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息13 
    2022-04-07 15:40:56.603  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息14 
    2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息15 
    2022-04-07 15:40:56.604  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息16 
    2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息17 
    2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息18 
    2022-04-07 15:40:56.605  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息19 
    2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息20 
    2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息21 
    2022-04-07 15:40:56.606  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息22 
    2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息23 
    2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息24 
    2022-04-07 15:40:56.607  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息25 
    2022-04-07 15:40:56.627  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息33 
    2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息34 
    2022-04-07 15:40:56.628  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息35 
    2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息36 
    2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息37 
    2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息38 
    2022-04-07 15:40:56.629  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息39 
    2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息40 
    2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息41 
    2022-04-07 15:40:56.630  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息42 
    2022-04-07 15:40:56.631  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息48 
    2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息53 
    2022-04-07 15:40:56.632  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息54 
    2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息57 
    2022-04-07 15:40:56.633  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息58 
    2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息63 
    2022-04-07 15:40:56.634  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息65 
    2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息66 
    2022-04-07 15:40:56.635  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息67 
    2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息70 
    2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息73 
    2022-04-07 15:40:56.636  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息75 
    2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息78 
    2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息79 
    2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息82 
    2022-04-07 15:40:56.637  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息43 
    2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息44 
    2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息45 
    2022-04-07 15:40:56.638  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息50 
    2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息51 
    2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息52 
    2022-04-07 15:40:56.639  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息61 
    2022-04-07 15:40:56.640  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息62 
    2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息69 
    2022-04-07 15:40:56.641  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息71 
    2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息72 
    2022-04-07 15:40:56.642  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息80 
    2022-04-07 15:40:56.643  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息81 
    2022-04-07 15:40:56.656  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息96 
    2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息97 
    2022-04-07 15:40:56.657  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息98 
    2022-04-07 15:40:56.658  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息83 
    2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息85 
    2022-04-07 15:40:56.659  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息88 
    2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息89 
    2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息90 
    2022-04-07 15:40:56.660  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息91 
    2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息92 
    2022-04-07 15:40:56.661  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息94 
    2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息95 
    2022-04-07 15:40:56.662  INFO 21709 --- [container-0-C-1] c.e.t.kafkaconsumer2.test.ReceiveData    : 接收消息: 消息99 
    
    
    
    遇到问题2:
    Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: 
    The number of expected partitions was: 2, but 1 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
    
    解决:
    生产者yml文件中 spring.cloud.stream.kafka.binder 后面加入
    auto-add-partitions: true     
    # 当partition-count设置的值超过原来设置的值,true=自动创建分区
    # partition-count: 2 这里的值超过原来设置的值,如果不是自动创建分区会抛上面的异常
    

    相关文章

      网友评论

          本文标题:三、Spring Cloud Stream+Kafka重复消费问

          本文链接:https://www.haomeiwen.com/subject/tffssrtx.html