美文网首页程序员
Spring Boot 之 RabbitMQ 集成之路

Spring Boot 之 RabbitMQ 集成之路

作者: 糯米团子_大芒果 | 来源:发表于2019-02-18 17:40 被阅读0次

最近集成了RabbitMQ消息队列,试着研究一下RabbitMQ的几种模式,参考官方文档
新建项目什么的不讲了。
配置pom.xml文件,用到了springboot对于AMQP(高级消息队列协议,即面向消息的中间件的设计)

<!-- 添加springboot对amqp的支持 -->
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>

创建个rabbit目录,然后新建三个类,sender,receiver,config

Work模式

@Configuration
public class MQConfig {
    
    public static final String QUEUE = "queue";
    
    /**
     * Direct模式 交换机Exchange
     * */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }
}

config里配置好一个最简单的queue
然后sender里使用AmqpTemplate去发送消息

@Service
public class MQSender {

    private static Logger log = LoggerFactory.getLogger(MQSender.class);
    
    @Autowired
    AmqpTemplate amqpTemplate ;
    
    public void send(Object message) {
        String msg = RedisService.beanToString(message);//这里只是我其他类里一个Object对象转String对象,你们可以自己实现
        log.info("send message:"+msg);
        amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    }
}

然后在 receiver里接收消息

@Service
public class MQReceiver {

    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);  

    @RabbitListener(queues=MQConfig.QUEUE)
    public void receive(String message) {
        log.info("receive message:"+message);
    }
}

在Controller调用send方法运行之后控制台会输出如下结果

2019-02-18 11:31:33.377 INFO 2356 --- [nio-8080-exec-1] com.imooc.miaosha.rabbitmq.MQSender : send message:hello,world
2019-02-18 11:31:33.514 INFO 2356 --- [cTaskExecutor-1] com.imooc.miaosha.rabbitmq.MQReceiver : receive message:hello,world

公平派遣与循环派遣

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。在这种模式下,调度不一定完全符合我们的要求。例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙,而另一个工作人员几乎不会做任何工作。那么,RabbitMQ对此一无所知,仍然会均匀地发送消息。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

Fanout 交换机模式

正如您可能从名称中猜到的那样,它只是将收到的所有消息广播到它知道的所有队列中。而这正是我们传播信息所需要的。


参照官方实例的实现,代码如下
config,里面的自动删除队列

@Configuration
public class MQConfig {
    /**
     * Fanout模式 交换机Exchange
     * */
    @Bean
    public Queue autoDeleteQueue1()  {
        return  new AnonymousQueue();
    }
    @Bean
    public Queue autoDeleteQueue2 ()  {
        return  new AnonymousQueue();
    }
    @Bean
    public FanoutExchange fanoutExchage(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchage());
    }
    @Bean
    public Binding FanoutBinding2() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchage());
    }
}

临时队列 (AnonymousQueue)

您可能还记得以前我们使用的是具有特定名称的队列。能够命名队列对我们来说至关重要 - 我们需要将工作人员指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名非常重要。

但是我们的Fanout示例并非如此。我们希望了解所有消息,而不仅仅是其中的一部分消息。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。

首先,每当我们连接到Rabbit时,我们都需要一个新的空队列。为此,我们可以使用随机名称创建队列,或者更好 - 让服务器为我们选择随机队列名称。

其次,一旦我们断开消费者,就应该自动删除队列。为了使用Spring AMQP客户端执行此操作,我们定义了AnonymousQueue,它使用生成的名称创建一个非持久的,独占的自动删除队列

sender里添加代码:

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendFanout() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots.incrementAndGet() == 3) {
            dots.set(1);
        }
        for (int i = 0; i < dots.get(); i++) {
            builder.append('.');
        }
        builder.append(count.incrementAndGet());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message);
        log.info("sendFanout [x] Sent '" + message + "'");
    }

receiver里添加代码:

        @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveFanout1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverFanout 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverFanout 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveFanout2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverFanout 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverFanout 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
        }
运行之后控制台可以看到如下信息,可见exchange把消息广播给了所有的receiver。

适用于需要广播的操作

Direct 交换机模式

我们会将消息发送给特定的队列,而不是广播。我们使用一个key作为路由键。这样接收程序将能够选择它想要接收(或订阅)的key



config代码里添加:

    /**
     * Direct模式 交换机Exchange
     * */

    @Bean
    public DirectExchange direct(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }
    @Bean
    public Binding DirectBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1())
                .to(direct())
                .with("orange");
    }
    @Bean
    public Binding DirectBinding2() {
        return BindingBuilder.bind(autoDeleteQueue1())
                .to(direct())
                .with("black");
    }
    @Bean
    public Binding DirectBinding3() {
        return BindingBuilder.bind(autoDeleteQueue2())
                .to(direct())
                .with("green");
    }
    @Bean
    public Binding DirectBinding4() {
        return BindingBuilder.bind(autoDeleteQueue2())
                .to(direct())
                .with("black");
    }

sender代码里添加:

    private final String[] keys = {"orange", "black", "green"};


    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendDirect() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (dots.incrementAndGet() == 3) {
            dots.set(0);
        }
        String key = keys[this.dots.get()];
        builder.append(key).append(' ');
        builder.append(this.count.get());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, key, message);
        log.info("sendDirect [x] Sent '" + message + "'");
    }

receiver代码添加:

    @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveFanout1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverDirect 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverDirect 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveFanout2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverDirect 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverDirect 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }
运行后结果如下,可见,两个receiver都能收到black信息,而receiver1只能收到green信息而receiver2只能收到orange信息。

Topic 交换机模式

主题交流
发送到主题交换的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密钥中可以包含任意数量的单词,最多可达255个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换- 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是绑定键有两个重要的特殊情况:

  • *(星号)可以替代一个单词。
  • #(hash)可以替换零个或多个单词。



    config添加代码如下:

    /**
     * Topic模式 交换机Exchange
     * */
    @Bean
    public TopicExchange topicExchage(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.orange.*");
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.*.rabbit");
    }
    @Bean
    public Binding topicBinding3() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(topicExchage()).with("lazy.#");
    }

sender添加如下代码:

    private final String[] topickeys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendTopic() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (this.dots.incrementAndGet() == topickeys.length) {
            this.dots.set(0);
        }
        String key = topickeys[this.dots.get()];
        builder.append(key).append(' ');
        builder.append(this.count.incrementAndGet());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, key, message);
        log.info(" [x] Sent '" + message + "'");
    }

receiver代码添加如下:

    @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveTopic1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverTopic 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverTopic 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveTopic2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverTopic 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverTopic 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

运行后得到如下结果,路由密钥设置为“ quick.orange.rabbit ”的消息将传递到两个队列。消息“ lazy.orange.elephant ”也将同时发送给他们。另一方面,“ quick.orange.fox ”只会转到第一个队列,而“ lazy.brown.fox ”只会转到第二个队列。“ lazy.pink.rabbit ”将仅传递到第二个队列一次,即使它匹配两个绑定。“ quick.brown.fox ”与任何绑定都不匹配,因此它将被丢弃。

如果我们违反合同并发送带有一个或四个单词的消息,例如“ orange ”或“ quick.orange.male.rabbit ”,会发生什么?好吧,这些消息将不匹配任何绑定,并将丢失。

另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

可见,topic模式能够实现更加灵活的控制信息流向

相关文章

网友评论

    本文标题:Spring Boot 之 RabbitMQ 集成之路

    本文链接:https://www.haomeiwen.com/subject/iwioeqtx.html