美文网首页
SpringCloud 集成RabbitMQ使用

SpringCloud 集成RabbitMQ使用

作者: 小强唐 | 来源:发表于2018-12-10 16:49 被阅读71次

    (一)简单使用

    • 1、配置pom包,主要是添加spring-boot-starter-amqp的支持
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 2、配置文件
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    • 3、发送者
    public class HelloSender {
    
    // spring boot 为我们提供的包装类
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
    // 调用 发送消息的方法 
            this.rabbitTemplate.convertAndSend("hello", context);
        }
    
    }
    
    • 4、接受者
        //1. @RabbitListener(queues = "myQueue") // 不能自动创建队列
        //2. 自动创建队列 @RabbitListener(queuesToDeclare = @Queue("myQueue"))
        //3. 自动创建, Exchange和Queue绑定
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue("myQueue"),
                exchange = @Exchange("myExchange")
        ))
        public void process(String message) {
            log.info("MqReceiver: {}", message);
        }
    
        /**
         * 数码供应商服务 接收消息
         * @param message
         */
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange("myOrder"),
                key = "computer",
                value = @Queue("computerOrder")
        ))
        public void processComputer(String message) {
            log.info("computer MqReceiver: {}", message);
        }
    
    
        /**
         * 水果供应商服务 接收消息
         * @param message
         */
        @RabbitListener(bindings = @QueueBinding(
                exchange = @Exchange("myOrder"),
                key = "fruit",
                value = @Queue("fruitOrder")
        ))
        public void processFruit(String message) {
            log.info("fruit MqReceiver: {}", message);
        }
    
    • Test
    
    /**
     * 发送mq消息测试
     */
    @Component
    public class MqSenderTest extends OrderApplicationTests {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void send() {
            amqpTemplate.convertAndSend("myQueue", "now " + new Date());
        }
    
        @Test
        public void sendOrder() {
            amqpTemplate.convertAndSend("myOrder", "computer", "now " + new Date());
        }
    }
    

    使用 spring cloud stream 操作 rabbitmq

    • Maven依赖
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    • 使用系统提供 Sink ,创建接收器
      然后创建一个名为SinkReceiver的类,用来接收RabbitMQ发送来的消息,如下:
    @EnableBinding(Sink.class)
    public class SinkReceiver {
        private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
        @StreamListener(Sink.INPUT)
        public void receive(Object playload) {
            logger.info("Received:"+playload);
        }
    }
    

    首先使用了@EnableBinding注解实现对消息通道的绑定,我们在该注解中还传入了一个参数Sink.class,Sink是一个接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义。然后我们在SinkReceiver类中定义了receive方法,并在该方法上添加了@StreamListener注解,该注解表示该方法为消息中间件上数据流的事件监听器,Sink.INPUT参数表示这是input消息通道上的监听处理器。

    • 测试


    • 自定义消息通道
      前面提到了Sink和Source两个接口,这两个接口中分别定义了输入通道和输出通道,而Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,来定义一个自己的消息通道。

    首先我们定义一个接口叫做MySink,如下:

    public interface MySink {
        String INPUT = "mychannel";
    
        @Input(INPUT)
        SubscribableChannel input();
    }
    

    这里我们定义了一个名为mychannel的消息输入通道,@Input注解的参数则表示了消息通道的名称,同时我们还定义了一个方法返回一个SubscribableChannel对象,该对象用来维护消息通道订阅者。然后,我们再定义一个名为MySource的接口,如下:

    public interface MySource {
        @Output(MySink.INPUT)
        MessageChannel output();
    }
    

    @Output注解中描述了消息通道的名称,还是mychannel,然后这里我们也定义了一个返回MessageChannel对象的方法,该对象中有一个向消息通道发送消息的方法。

    最后我们定义一个消息接收类,如下:

    @EnableBinding(value = {MySink.class})
    public class SinkReceiver2 {
        private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);
    
        @StreamListener(MySink.INPUT)
        public void receive(Object playload) {
            logger.info("Received:" + playload);
        }
    }
    

    OK,我们在这里绑定消息通道,然后监听自定义的消息通道,最后来测试一下,如下:

    @RestController
    public class StreamHelloApplicationTests {
    
        @Autowired
        private MySource mySource;
    
       @GetMapping("testMyStream")
        public void contextLoads() {
            mySource.output().send(MessageBuilder.withPayload("hello 123").build());
        }
    }
    

    如果想要发送对象也可以直接发送,不用进行对象转换,如下:

    发送:

    User user= new User (1234, "唐志强", "牛逼");
    mySource.output().send(MessageBuilder.withPayload(user).build());
    

    接收:

    @StreamListener(MySink.INPUT)
    public void receive(User user ) {
        logger.info("Received:" + user );
    }
    

    如果我们想要在接收成功后给一个回执,也是OK的,如下:

    @StreamListener(MySink.INPUT)
    @SendTo(Source.OUTPUT)//定义回执发送的消息通道
    public String receive(Book playload) {
        logger.info("Received:" + playload);
        return "receive msg :" + playload;
    }
    

    方法的返回值就是回执消息,回执消息在系统默认的output通道中,我们如果想要接收这个消息,当然就要监听这个通道,如下:

    @StreamListener(Source.OUTPUT)
    public void receive2(String msg) {
        System.out.println("msg:"+msg);
    }
    
    • rabbitmq 控制面板查看发送的消息,以便判断信息的正确性
    spring:
        stream:
          bindings:
            myMessage:
              group: order
              content-type: application/json # 设置消息发送后为 json 格式,以便控制台查看,接收方会自动转换为 具体的消息数据格式
    
    • 消费组
      由于我们的服务可能会有多个实例同时在运行,如果不做任何设置,此时发送一条消息将会被所有的实例接收到,但是有的时候我们可能只希望消息被一个实例所接收,这个需求我们可以通过消息分组来解决。方式很简单,给项目配置消息组,如下:
    spring.cloud.stream.bindings.mychannel.group=order_group
    

    相关文章

      网友评论

          本文标题:SpringCloud 集成RabbitMQ使用

          本文链接:https://www.haomeiwen.com/subject/sdjacqtx.html