美文网首页
RabbitMQ急速入门

RabbitMQ急速入门

作者: 若兮缘 | 来源:发表于2019-03-11 08:52 被阅读176次

在这一小节,我们来实现一个最简单的消息生产与消费案例,构建生产者和消费者模型。也就是说生产者发送消息,投递到RabbitMQ中,然后消费者去监听队列,获取数据进行消费。

涉及的相关对象或者概念
  • ConnectionFactory:获取连接工厂
  • Connection:一个连接
  • Channel:数据通信信道,可发送和接收消息
  • Queue:具体的消息存储队列
  • Producer & Consumer:生产和消费者
工程准备

1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring Starter Project

然后一直下一步就可以了。后续对 RabbitMQ API 的使用都会在这个工程中进行。

2.在pom文件中引入Maven依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
生产者模型构建
  1. 创建连接工厂ConnectionFactory
  2. 通过连接工厂创建连接Connection
  3. 通过连接创建频道Channel
  4. 通过频道Channel发送数据
  5. 关闭连接
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Procuder {
    
    public static void main(String[] args) throws Exception {
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        
        //4 通过Channel发送数据
        for(int i=0; i < 5; i++){
            String msg = "Hello RabbitMQ!";
            //1 exchange(交换机)   2 routingKey(路由规则)  3 props(修饰消息)  4 body(消息实体)
            channel.basicPublish("", "test001", null, msg.getBytes());
        }

        //5 记得要关闭相关的连接
        channel.close();
        connection.close();
    }
}
消费者模型构建
  1. 创建连接工厂ConnectionFactory
  2. 通过连接工厂创建连接Connection
  3. 通过连接创建频道Channel
  4. 声明一个队列Queue
  5. 创建一个消费者Consumer
  6. 设置Channel,指定消费者监听的队列
  7. 获取消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    public static void main(String[] args) throws Exception {
        
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.43.157");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        
        //4 声明(创建)一个队列
        String queueName = "test001";
        //queueDeclare(queue, durable, exclusive, autoDelete, arguments)
        //queue:队列名称  durable:是否持久化   exclusive:独占,表示这个队列只有这一个channel能去监听,目的是为了保障顺序消费
        //autoDelete:队列如果与其他的exchange都没有绑定关系,那么会自动进行删除  arguments:扩展参数
        channel.queueDeclare(queueName, true, false, false, null);
        
        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
        //6 设置Channel
        //basicConsume(String queue, boolean autoAck, Consumer callback)
        //queue:队列名称   autoAck:是否自动签收   callback:消费者
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            //7 获取消息
            //nextDelivery不带参数会一直阻塞,待参数表示超时时间,超过指定时间没有接收到消息就放行
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端: " + msg);
            //Envelope envelope = delivery.getEnvelope();
        }
    }
}

先启动consumer端,右键Run As --> Java Application
查看管控台:http://192.168.43.157:15672/
可以看到已经有一个连接、频道、队列和消费者了。

然后启动producer端,此时消费端控制台打印了如下内容,证明消息被成功消费了。

消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!
消费端: Hello RabbitMQ!

刷新管控台,可以看到首页显示出了两个线性图,这里能直观看到生产消费的进度以及队列消息的情况。

之所以需要先启动消费端,是因为需要先声明队列才能接收消息,此时队列已经创建好了,就可以先启动生产者再启动消费者,同样可以完成消费。

Default Exchange

这里有个问题就是在生产者投递消息时需要指定exchange,但是我们指定的是空,为什么消息可以被正确投递到队列test001中呢。
这是因为如果生产者在投递消息时不指定exchange,那么会使用rabbitmq默认的exchange,可以通过管控台看到第一个就是,其他的都是rabbitmq内部的交换机。

这个AMPQ default交换机的路由规则是按照指定的routingKey去MQ中查找是否有相同名称的队列,如果有就将消息路由到该队列中,如果没有消息就发送失败。
在下一小节我们会对Exchange进行详细讲解。

相关文章

网友评论

      本文标题:RabbitMQ急速入门

      本文链接:https://www.haomeiwen.com/subject/hwampqtx.html