美文网首页spring cloud
SpringCloud集群整合Rabbitmq、延迟队列、Str

SpringCloud集群整合Rabbitmq、延迟队列、Str

作者: 笔记本一号 | 来源:发表于2021-05-31 01:17 被阅读0次

    代码github地址:https://github.com/tzb1017432592/springcloud-lean
    SpringCloud高可用集群的搭建在我之前的博客已经写有,我这里启动了三个服务中心、三个服务网关、两个服务提供客户端、一个消费端、一个nginx服务,在此集群之上我们整合rabbitmq,这里我会启动一个mq的生产者端和mq消费者端

    引入依赖
         <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
         </dependency>
         <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
         </dependency>
         <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
         </dependency>
    

    yml配置,eureka-rabbitmq-producer是服务的生产者端,eureka-rabbitmq-consumer是服务的消费者端,

    server:
      port: 8951
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
    spring:
      application:
        name: eureka-rabbitmq-producer
      profiles: mq01
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: cloudtest
    
    ---
    server:
      port: 8952
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
    spring:
      application:
        name: eureka-rabbitmq-consumer
      profiles: mq02
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: cloudtest
    
    配置好idea启动的参数,根据不同的参数启动不同的环境

    代码创建服务生产者端的exchange、queue、binding、routingkey

    @Configuration
    public class RabbitmqConfig {
        public final static String EXCHANGE_TEST = "exchange_test";
        public final static String QUEUE_TEST = "queue_test";
        public final static String ROUTINGKEY_TEST = "cloudtest.*";
    
        @Bean(EXCHANGE_TEST)
        @Profile(value = "mq01")
        public Exchange exchange() {
            return ExchangeBuilder
                    .topicExchange(EXCHANGE_TEST)
                    .durable(true)
                    .build();
        }
    
        @Bean(QUEUE_TEST)
        @Profile(value = "mq01")
        public Queue queue() {
            return new Queue(QUEUE_TEST);
        }
    
        @Bean
        @Profile(value = "mq01")
        public Binding binding(
                @Qualifier(EXCHANGE_TEST) Exchange exchange
                , @Qualifier(QUEUE_TEST) Queue queue) {
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with(ROUTINGKEY_TEST)
                    .noargs();
        }
    }
    

    发送消息

    @RestController
    @RequestMapping("/rabbitmq/producer/")
    @Profile(value = "mq01")
    public class TestController {
    
        public final static String ROUTING_KEY1 = "cloudtest.test1";
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("producer1/{message}")
        public String producer1(@PathVariable("message") String message) {
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TEST, ROUTING_KEY1, message);
            return "success";
        }
    }
    

    服务消费者端监听队列

    @Component
    @Profile("mq02")
    @Slf4j
    public class RabbitmqConsumer {
    
        @RabbitListener(queues = RabbitmqConfig.QUEUE_TEST)
        public void Consumer1(String payload, Message message){
            log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
        }
    }
    

    配置好网关

    server:
      port: 8851
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
    spring:
      application:
        name: eureka-gateway
      cloud:
        gateway:
          discovery:
            locator:
              enabled: true
          routes:
            - id: eureka-client01
              uri: lb://eureka-client01
              predicates:
                - Path=/client01/**
    
            - id: eureka-consumer
              uri: lb://eureka-consumer
              predicates:
                - Path=/consumer/**
    
            - id: eureka-rabbitmq-producer
              uri: lb://eureka-rabbitmq-producer
              predicates:
                - Path=/rabbitmq/producer/**
    
            - id: eureka-rabbitmq-consumer
              uri: lb://eureka-rabbitmq-consumer
              predicates:
                - Path=/rabbitmq/**
    
      profiles: gw01
    
    启动服务,服务已经启动好了 发送消息

    延迟队列

    延迟队列有许多应用场景如:自动收货、自动取消订单、自动发布文章等,实现延迟队列需要下载插件
    下载地址:https://www.rabbitmq.com/community-plugins.html

    将下载好的插件放到rabbitmq安装目录的plugins目录中执行命令
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    代码

    @Configuration
    public class RabbitmqDelayConfig {
        public final static String DELAY_EXCHANGE_TEST = "delay_exchange_test";
        public final static String DELAY_QUEUE_TEST = "delay_queue_test";
        public final static String ROUTINGKEY_DELAY = "delay_cloudtest.*";
    
        @Bean(DELAY_EXCHANGE_TEST)
        @Profile(value = "mq01")
        public Exchange exchange(){
            return ExchangeBuilder
                    .topicExchange(DELAY_EXCHANGE_TEST)
                    .delayed()          // 开启支持延迟消息
                    .durable(true)
                    .build();
        }
    
        // 创建队列
        @Bean(DELAY_QUEUE_TEST)
        @Profile(value = "mq01")
        public Queue queue(){
            return new Queue(DELAY_QUEUE_TEST);
        }
    
        // 队列绑定交换机
        @Bean
        @Profile(value = "mq01")
        public Binding delayBinding(
                @Qualifier(DELAY_QUEUE_TEST) Queue queue,
                @Qualifier(DELAY_EXCHANGE_TEST) Exchange exchange){
            return BindingBuilder
                    .bind(queue)
                    .to(exchange)
                    .with(ROUTINGKEY_DELAY)
                    .noargs();      
        }
    }
    

    发送延迟消息

    @RestController
    @RequestMapping("/rabbitmq/producer")
    @Profile("mq01")
    @Slf4j
    public class TestController {
    
        public final static String ROUTING_KEY1 = "cloudtest.test1";
        public final static String DELAY_ROUTING_KEY1 = "delay_cloudtest.test1";
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
    
        @GetMapping("delayproducer1/{message}")
        public String delayproducer1(@PathVariable("message") String message) {
            rabbitTemplate.convertAndSend(
                    RabbitmqDelayConfig.DELAY_EXCHANGE_TEST
                    , DELAY_ROUTING_KEY1
                    , message
                    , m -> {
                        // 设置消息的持久
                        m.getMessageProperties()
                                .setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        // 设置消息延迟的时间,单位ms毫秒
                        m.getMessageProperties()
                                .setDelay(5000);
                        return m;
                    });
            log.info("发送延迟消息成功【{}】",new Date());
            return "success";
        }
    }
    

    接收延迟消息

    @Component
    @Profile("mq02")
    @Slf4j
    public class RabbitmqConsumer {
        @RabbitListener(queues = RabbitmqDelayConfig.DELAY_QUEUE_TEST)
        public void Consumer2(String payload, Message message){
            log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
            log.info("接收延迟消息成功【{}】",new Date());
        }
    }
    
    访问延迟队列发送接口 我们代码中设置的延迟队列是5秒,时间正好相差5秒

    SpringStream

    引入依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    
    生产者端

    生产者端 yml

    server:
      port: 8951
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
    spring:
      application:
        name: eureka-rabbitmq-producer
      profiles: mq01
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: cloudtest
      cloud:
        stream:
          bindings:
            #定义生产者通道
            myOutput:
              #定义交换机名称
              destination:  stream_exchange
    

    代码
    定义生产者通道

    @Component
    @Profile("mq01")
    public interface OutputStreamChannel {
    
        String OUTPUT="myOutput";
    
        @Output(OUTPUT)
        MessageChannel output();
    
    }
    

    定义生产者生产消息逻辑

    @Service
    @EnableBinding(OutputStreamChannel.class)
    @Profile("mq01")
    @Slf4j
    public class ProducerStreamServiceImpl implements ProducerStreamService {
    
        @Autowired
        private OutputStreamChannel outputStreamChannel;
    
        @Override
        public boolean rabbitMQSend(String msg) {
            return outputStreamChannel
                    .output()
                    .send(MessageBuilder.withPayload(msg).build());
        }
    }
    
    

    生产者控制类

    @RestController
    @RequestMapping("/rabbitmq/producer")
    @Profile("mq01")
    @Slf4j
    public class TestController {
        @Resource
        private ProducerStreamService producerStreamService;
    
        @GetMapping("streamproducer1/{message}")
        public String streamproducer1(@PathVariable("message") String message) {
            return producerStreamService.rabbitMQSend(message)?"success":"fail";
        }
    
    }
    
    消费者端

    yml

    server:
      port: 8952
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
    spring:
      application:
        name: eureka-rabbitmq-consumer
      profiles: mq02
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: cloudtest
      cloud:
        stream:
          bindings:
            #定义消费者通道
            myInput:
              #定义交换机名称
              destination:  stream_exchange
    
    

    消费者通道

    @Component
    @Profile("mq02")
    public interface InputStreamChannel {
        String INPUT="myInput";
    
        @Input(INPUT)
        SubscribableChannel input();
    }
    

    定义消费者消费消息逻辑

    @Service
    @EnableBinding(InputStreamChannel.class)
    @Profile("mq02")
    @Slf4j
    public class ConsumerStreamServiceImpl implements ConsumerStreamService {
    
        @Override
        @StreamListener(InputStreamChannel.INPUT)
        public void rabbitMQreceive(String msg) {
            log.info("stream消费到的消息:【{}】",msg);
        }
    }
    
    启动项目发送消息 消息成功发送,并且已经被消费到

    相关文章

      网友评论

        本文标题:SpringCloud集群整合Rabbitmq、延迟队列、Str

        本文链接:https://www.haomeiwen.com/subject/twhvsltx.html