美文网首页
快速入门RabbitMQ核心概念

快速入门RabbitMQ核心概念

作者: 端碗吹水 | 来源:发表于2020-11-23 17:26 被阅读0次

    哪些互联网大厂在使用RabbitMQ,为什么?

    初识RabbitMQ:

    • RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

    哪些互联网大厂在使用RabbitMQ:

    • 滴滴、美团、头条、去哪儿、艺龙

    为什么使用RabbitMQ:

    • 开源、性能优秀,稳定性保障
    • 提供可靠性消息投递模式(confirm) 、返回模式(return)
    • 与SpringAMQP完美的整合、API丰富
    • 集群模式丰富,表达式配置,HA模式,镜像队列模型
    • 保证数据不丢失的前提做到高可靠性、可用性

    RabbitMQ高性能的原因

    • 主要原因是因为RabbitMQ使用Erlang语言编写,Erlang语言最初在于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能是非常优秀的
    • Erlang的优点:Erlang有着和原生Socket一样的延迟

    AMQP高级消息队列协议与模型

    什么是AMQP高级消息队列协议:

    • AMQP全称是:Advanced Message Queuing Protocol,所以AMQP翻译过来就是:高级消息队列协议。AMQP定义:是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    AMQP协议模型:


    image.png

    AMQP核心概念

    • Server:又称Broker, 接受客户端的连接,实现AMQP实体服务
    • Connection:连接,应用程序与Broker的网络连接
    • Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务
    • Message:消息,服务器和应用程序之间传送的数据,由PropertiesBody组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容
    • Virtual host:虚拟主机,用于进行逻辑隔离,就有点类似于NameSpace或Group的概念,是最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue
    • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
    • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key
    • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息
    • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者

    RabbitMQ整体架构与消息流转

    RabbitMQ整体架构图:


    image.png

    RabbitMQ消息流转图:


    image.png

    RabbitMQ环境安装

    官方下载地址:

    我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:

    例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:

    [root@rabbitmq01 ~]# cd /usr/local/src
    [root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
    [root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
    [root@rabbitmq01 /usr/local/src]# ls
    erlang-23.1.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.9-1.el7.noarch.rpm
    [root@rabbitmq01 /usr/local/src]# 
    

    使用yum命令进行安装,因为yum可自动解决依赖关系:

    [root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
    [root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm
    

    RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:

    将配置文件放到/etc/rabbitmq目录下:

    [root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf
    

    修改配置文件:

    [root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
    # 允许默认用户被外部网络访问
    loopback_users.guest = false
    

    完成配置后,启动RabbitMQ Server:

    [root@rabbitmq01 ~]# rabbitmq-server start &
    

    检查端口是否正常监听,5672是RabbitMQ的默认端口号:

    [root@rabbitmq01 ~]# netstat -lntp |grep 5672
    tcp        0      0 0.0.0.0:25672           0.0.0.0:*           LISTEN      1922/beam.smp       
    tcp6       0      0 :::5672                 :::*                LISTEN      1922/beam.smp       
    [root@rabbitmq01 ~]# 
    

    启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:

    [root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management
    

    使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest

    image.png

    登录成功,进入到管控台首页:


    image.png

    rabbitmqctl命令行操作

    rabbitmqctl基础操作命令:

    # 关闭应用
    rabbitmqctl stop_app
    
    # 启动应用
    rabbitmqctl start_app
    
    # 节点状态
    rabbitmqctl status
    
    # 添加用户
    rabbitmqctl add user username password
    
    # 列出所有用户
    rabbitmqctl list users
    
    # 删除用户
    rabbitmqctl delete_user username
    
    # 清除用户权限
    rabbitmqctl clear_permissions -p vhostpath username
    
    # 列出用户权限
    rabbitmqctl list_user_permissions username
    
    # 修改密码
    rabbitmqctl change_password username newpassword
    
    # 设置用户权限
    rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"
    
    # 创建虚拟主机
    rabbitmqctl add vhost vhostpath
    
    # 列出所有虚拟主机
    rabbitmqctl list_vhosts
    
    # 列出虚拟主机上所有权限
    rabbitmqctl list_permissions -p vhostpath
    
    # 删除虚拟主机
    rabbitmqctl delete_vhost vhostpath
    
    # 查看所有队列信息
    rabbitmqctl list_queues
    
    # 清除队列里的消息
    rabbitmqctl -p vhostpath purge_queue blue
    

    rabbitmqctl高级操作命令:

    # 移除所有数据,要在rabbitmqctIl stop_app之后使用
    rabbitmqctl reset
    
    # 组成集群命令
    rabbitmqctl join_cluster <clusternode> [--ram]
    
    # 查看集群状态
    rabbitmqctl cluster_status
    
    # 修改集群节点的存储形式
    rabbitmqctl change_cluster_node_type disc | ram
    
    # 忘记节点(摘除节点)
    rabbitmqctl forget cluster_node [--offline]
    
    # 修改节点名称
    rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] 
    

    生产者消费者代码示例

    创建一个Maven工程,在pom文件中添加如下依赖:

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.16</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.10.0</version>
    </dependency>
    

    生产者代码示例:

    package com.zj.rabbitmq.learn.basic;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import lombok.SneakyThrows;
    
    /**
     * 生产者
     *
     * @author 01
     * @date 2020-11-23
     **/
    public class MyProducer {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                for (int i = 0; i < 5; i++) {
                    // 通过Channel发送数据
                    String msg = "Hello RabbitMQ!";
                    // 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
                    channel.basicPublish("", "test001", null, msg.getBytes());
                }
            }
        }
    }
    

    消费者代码示例:

    package com.zj.rabbitmq.learn.basic;
    
    import com.rabbitmq.client.*;
    import lombok.SneakyThrows;
    
    /**
     * 消费者
     *
     * @author 01
     * @date 2020-11-23
     **/
    public class MyConsumer {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                // 声明一个队列,队列不存在会自动创建
                channel.queueDeclare("test001", true, false, false, null);
                // 创建消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) {
                        String message = new String(body);
                        System.out.println("Received: " + message);
                    }
                };
    
                // 持续监听,消费消息
                while (true){
                    channel.basicConsume("test001", true, consumer);
                    Thread.sleep(1000);
                }
            }
        }
    }
    

    先运行消费者,再运行生产者,此时消费者控制台输出如下:


    image.png

    关于交换机

    Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:


    image.png

    交换机属性:

    • Name:交换机名称
    • Type:交换机类型direct、topic、 fanout、 headers
    • Durability:是否需要持久化,true为持久化
    • Auto Delete:当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
    • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
    • Arguments:扩展参数,用于扩展AMQP协议自制定化使用

    Direct Exchange

    • 所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
    • 注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃
    image.png

    生产者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import lombok.SneakyThrows;
    
    public class ProducerOfDirectExchange {
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ of Direct Exchange!";
                channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            }
        }
    }
    

    消费者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.*;
    import lombok.SneakyThrows;
    
    public class ConsumerOfDirectExchange {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
    
                String exchangeName = "test_direct_exchange";
                String exchangeType = "direct";
                String queueName = "test_direct_queue";
                String routingKey = "test.direct";
    
                // 声明一个direct类型的Exchange
                channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
                // 声明一个队列,队列不存在会自动创建
                channel.queueDeclare(queueName, true, false, false, null);
                // 将队列绑定到指定的Exchange上
                channel.queueBind(queueName, exchangeName, routingKey);
    
                // 创建消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) {
                        String message = new String(body);
                        System.out.println("Received: " + message);
                    }
                };
    
                // 持续监听,消费消息
                while (true) {
                    channel.basicConsume(queueName, true, consumer);
                    Thread.sleep(1000);
                }
            }
        }
    }
    

    Topic Exchange

    • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
    • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
    • 可以使用通配符进行模糊匹配:
      • 符号 "#" 匹配一个或多个词
      • 符号 "*" 匹配不多不少一个词
      • 例如:
        • "log.#" 能够匹配到 "log.info.oa"
        • "log.*" 只会匹配到 "log.error"
    image.png

    生产者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import lombok.SneakyThrows;
    
    public class ProducerOfTopicExchange {
        
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ of Topic Exchange!";
                channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
                channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
                channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
            }
        }
    }
    

    消费者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.*;
    import lombok.SneakyThrows;
    
    public class ConsumerOfTopicExchange {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
    
                String exchangeName = "test_topic_exchange";
                String exchangeType = "topic";
                String queueName = "test_topic_queue";
                //String routingKey = "user.*";
                String routingKey = "user.#";
    
                // 声明一个topic类型的Exchange
                channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
                // 声明一个队列,队列不存在会自动创建
                channel.queueDeclare(queueName, true, false, false, null);
                // 将队列绑定到指定的Exchange上
                channel.queueBind(queueName, exchangeName, routingKey);
    
                // 创建消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) {
                        String message = new String(body);
                        System.out.println("Received: " + message);
                    }
                };
    
                // 持续监听,消费消息
                while (true) {
                    channel.basicConsume(queueName, true, consumer);
                    Thread.sleep(1000);
                }
            }
        }
    }
    

    Fanout Exchange

    • 不处理路由键,只需要简单的将队列绑定到交换机上
    • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
    • Fanout交换机转发消息是最快的
    image.png

    生产者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import lombok.SneakyThrows;
    
    public class ProducerOfFanoutExchange {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            String exchangeName = "test_fanout_exchange";
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                for (int i = 0; i < 10; i++) {
                    // 通过Channel发送数据
                    String msg = "Hello RabbitMQ of Fanout Exchange!";
                    channel.basicPublish(exchangeName, "", null, msg.getBytes());
                }
            }
        }
    }
    

    消费者代码示例:

    package com.zj.rabbitmq.learn.exchange;
    
    import com.rabbitmq.client.*;
    import lombok.SneakyThrows;
    
    public class ConsumerOfFanoutExchange {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
    
                String exchangeName = "test_fanout_exchange";
                String exchangeType = "fanout";
                String queueName = "test_fanout_queue";
                // 不设置routingKey
                String routingKey = "";
    
                // 声明一个fanout类型的Exchange
                channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
                // 声明一个队列,队列不存在会自动创建
                channel.queueDeclare(queueName, true, false, false, null);
                // 将队列绑定到指定的Exchange上
                channel.queueBind(queueName, exchangeName, routingKey);
    
                // 创建消费者
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) {
                        String message = new String(body);
                        System.out.println("Received: " + message);
                    }
                };
    
                // 持续监听,消费消息
                while (true) {
                    channel.basicConsume(queueName, true, consumer);
                    Thread.sleep(1000);
                }
            }
        }
    }
    

    绑定、队列、消息、虚拟主机

    Binding - 绑定:

    • Exchange和Exchange、Queue之 间的连接关系
    • Binding中可以包含RoutingKey或者参数

    Queue - 消息队列:

    • 消息队列,实际存储消息数据
    • Durability:是否持久化。Durable:是,Transient:否
    • Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

    Message - 消息:

    • 服务器和应用程序之间传送的数据
    • 本质上就是一段数据,由Properties和Payload(Body)组成
    • 常用属性:delivery mode、headers(自定义属性)
    • Message其他属性:
      • content_type、content_encoding、priority
      • correlation id、reply_to、expiration、message_id
      • timestamp、type、 user_id、app_id、 cluster_id

    设置Message属性代码示例:

    package com.zj.rabbitmq.learn.message;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import lombok.SneakyThrows;
    
    import java.util.HashMap;
    import java.util.Map;
    
    class MyProducer {
    
        @SneakyThrows
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.243.164");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Map<String, Object> headers = new HashMap<>();
            headers.put("a", "1");
            headers.put("b", "2");
            // 自定义Message的一些属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    // 持久化模式
                    .deliveryMode(2)
                    // 消息的编码格式
                    .contentEncoding("UTF-8")
                    // 消息过期时间
                    .expiration("15000")
                    // 设置消息的头
                    .headers(headers)
                    .build();
    
            // 通过连接工厂创建连接
            try (Connection connection = factory.newConnection();
                 // 通过连接创建一个Channel
                 Channel channel = connection.createChannel()) {
                for (int i = 0; i < 5; i++) {
                    // 通过Channel发送数据
                    String msg = "Hello RabbitMQ!";
                    // 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
                    channel.basicPublish("", "test001", properties, msg.getBytes());
                }
            }
        }
    }
    

    Virtual host - 虚拟主机:

    • 虚拟地址,用于进行逻辑隔离,最上层的消息路由
    • 一个Virtual Host里面可以有若干个Exchange和Queue
    • 同一个Virtual Host里面不能有相同名称的Exchange或Queue

    相关文章

      网友评论

          本文标题:快速入门RabbitMQ核心概念

          本文链接:https://www.haomeiwen.com/subject/kxyviktx.html