美文网首页
四、Spring Cloud Stream+Kafka保证消费顺

四、Spring Cloud Stream+Kafka保证消费顺

作者: 一介书生独醉江湖 | 来源:发表于2022-04-08 09:12 被阅读0次
    1个Topic(主题)只创建1个Partition(分区)
    
    使用idea创建两个module , kafka-producer-order , kafka-consumer-order
    
    pom.xml  kafka相关
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <!--<version>2.6.4</version>  去掉这一行 -->
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
    
    消费者(kafka-consumer-order) application.yml
    
    server:
      port: 8083
    spring:
      application:
        name: kafka_consumer_order
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo-order:                          #这里可以任意写,生产者应与之一致
              destination: custom-message-topic-order   #这里可以任意写,生产者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * @Author ds
     * @Date 2022-04-07
     */
    public interface StreamClient {
    
        String STREAM_DEMO = "stream-demo-order";
    
        @Input(StreamClient.STREAM_DEMO)
        SubscribableChannel streamDataInput();
    }
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    /**
     * @Author ds
     * @Date 2022-04-07
     */
    @Slf4j
    @EnableBinding(StreamClient.class)
    public class ReceiveData {
    
        @StreamListener(StreamClient.STREAM_DEMO)
        public void consume(String message){
            log.info("接收消息: {} " , message);
        }
    }
    
    生产者(kafka-producer-order) application.yml
    
    server:
      port: 8182
    
    spring:
      application:
        name: kafka_producer_order
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092   #Kafka的消息中间件服务器
              zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
              auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
          bindings:
            stream-demo-order:                          #这里可以任意写,消费者应与之一致
              destination: custom-message-topic-order   #这里可以任意写,消费者应与之一致,消息发往的目的地
              content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain
    
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    public interface StreamClient {
    
        String STREAM_DEMO = "stream-demo-order";
    
        @Output(StreamClient.STREAM_DEMO)
        MessageChannel streamDataOut();
    }
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Author ds
     * @Date 2022-04-02
     */
    @RestController
    @EnableBinding(StreamClient.class)
    public class TestController {
    
        @Autowired
        private StreamClient streamClient;
    
        @GetMapping("/produce")
        public String produce(){
            for(int i = 0; i < 100 ; i++){
                streamClient.streamDataOut().send(MessageBuilder.withPayload("消息" + i).build());
            }
            return "成功";
        }
    }
    

    相关文章

      网友评论

          本文标题:四、Spring Cloud Stream+Kafka保证消费顺

          本文链接:https://www.haomeiwen.com/subject/kaussrtx.html