美文网首页工具推荐
RabbitMQ入门使用

RabbitMQ入门使用

作者: 依弗布德甘 | 来源:发表于2020-03-01 13:25 被阅读0次

    RabbitMQ

    • RabbitMQ主要基于AMQP协议实现
      AMQP (Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
    RabbitMQ模型架构
    • Producer

      生产者,投递消息的一方。用于创建消息,然后发布到RabbitMQ中
      消息一般分为两个部分:消息体 、附加信息

      • 消息体一般是一个带有业务逻辑结构的数据,比如JSON字符串
      • 附加信息用来表述这条消息,如交换器名称、路由键和一些自定义属性等等
    • Broker

      消息中间件的服务节点;单台机器部署Broker就相当于是整个MQ服务器

    • Virtual Host

      虚拟主机,表示一批交换器、消息队列和相关对象;虚拟主机是共享相同身份认证和加密环境的独立服务器域

    • Channel

      频道或信道,是建立在Connection连接之上的一种轻量级的连接;一个Connection可以创建任意数量的Channel

    大部分操作都是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等

    • RoutingKey

      路由键;生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则;RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用

    • Exchange

      交换器,生产者将消费发送到Exchange,再由它将消息路由到一个或多个队列中,如果路由不到,或返回或直接丢弃

      • fanout:扇形交换机,会把所有消息路由到与之绑定的所有队列中
      • direct:直连交换机,会根据BindingKey与RoutingKey匹配发送消息
      • topic:主题交换机,与direct类似,但是可以通过通配符模糊匹配
      • headers:头交换机,根据消息头部中带的值进行匹配
    • Queue

      队列,是RabbitMQ的内部对象,用于存储消息

    • Binding

      绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样交换器就知道如何正确的将消息路由到哪个队列中

    • Consumer

      消费者,接受消息的一方;消费者连接到RabbitMQ服务器并定于到队列上

    RabbitMQ运转流程

    业务运转流程 架构运转流程
    • 生产者发送消息的过程:
      1. 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
      2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
      3. 生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等
      4. 生产者通过路由键将交换器和队列绑定起来
      5. 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
      6. 相应的交换器根据接受到的路由键排查相匹配的队列
      7. 如果找到,则将从生产者发送过来的消息存入相应的队列中
      8. 如果没找到,则根据生产者配置的属性选择丢弃还是回退给生产者
      9. 关闭信道,关闭连接
    • 消费者接收消息的过程:
      1. 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
      2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
      3. 等待RabbitMQ Broker 回应并投递相应队列中的消息,接收消息
      4. 消费者确认(ack)接收到的消息
      5. RabbitMQ 从队列中删除相应已被确认的消息
      6. 关闭信道、关闭连接


    RabbitMQ 安装和使用

    一、安装依赖环境

    1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本

    2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。

    3. 复制下载地址后,使用wget命令下载

      wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
      
    4. 安装 Erlang

      sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
      
    5. 安装 socat

      sudo yum install -y socat
      

    二、安装RabbitMQ

    1. 官方下载页面找到CentOS7版本的下载链接,下载rpm安装包

      wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
      

      提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

    2. 安装RabbitMQ

      sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
      

    三、启动和关闭

    • 启动服务

      sudo systemctl start rabbitmq-server
      
    • 查看状态

      sudo systemctl status rabbitmq-server
      
    • 停止服务

      sudo systemctl stop rabbitmq-server
      
    • 设置开机启动

      sudo systemctl enable rabbitmq-server
      

    四、开启Web管理插件

    1. 开启插件

      rabbitmq-plugins enable rabbitmq_management
      

      说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

    2. 添加用户

      rabbitmqctl add_user admin admin
      
    3. 为用户分配操作权限

      rabbitmqctl set_user_tags admin administrator
      
    4. 为用户分配资源权限

      rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      

    五、防火墙添加端口

    • RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙
    1. 添加端口

      sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
      
    2. 重启防火墙

      sudo firewall-cmd --reload
      

    多机多节点集群部署

    一、 环境准备

    • 准备三台安装好RabbitMQ 的机器,安装方法见 安装步骤

      • 10.10.1.41
      • 10.10.1.42
      • 10.10.1.43

      提示:如果使用虚拟机,可以在一台VM上安装好RabbitMQ后,创建快照,从快照创建链接克隆,会节省很多磁盘空间

    二、修改配置文件

    1. 修改10.10.1.41机器上的/etc/hosts文件

      sudo vim /etc/hosts
      
    2. 添加IP和节点名

      10.10.1.41 node1
      10.10.1.42 node2
      10.10.1.43 node3
      
    3. 修改对应主机的hostname

    hostname node1
    hostname node2
    hostname node3
    
    1. 10.10.1.41上的hosts文件复制到另外两台机器上
      sudo scp /etc/hosts root@node2:/etc/
      sudo scp /etc/hosts root@node3:/etc/
      
      说明:命令中的root是目标机器的用户名,命令执行后,可能会提示需要输入密码,输入对应用户的密码就行了
    2. 10.10.1.41上的/var/lib/rabbitmq/.erlang.cookie文件复制到另外两台机器上
      scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
      scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
      
      提示:如果是通过克隆的VM,可以省略这一步

    三、防火墙添加端口

    • 给每台机器的防火墙添加端口
    1. 添加端口

      sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
      
    2. 重启防火墙

      sudo firewall-cmd --reload
      

    四、启动RabbitMQ

    1. 启动每台机器的RabbitMQ

      sudo systemctl start rabbitmq-server
      

      或者

      rabbitmq-server -detached
      
    2. 10.10.1.42加入到集群

      # 停止RabbitMQ 应用
      rabbitmqctl stop_app
      # 重置RabbitMQ 设置
      rabbitmqctl reset
      # 加入到集群
      rabbitmqctl join_cluster rabbit@node1 --ram
      # 启动RabbitMQ 应用
      rabbitmqctl start_app
      
    3. 查看集群状态,看到running_nodes,[rabbit@node1,rabbit@node2]表示节点启动成功

      rabbitmqctl cluster_status
      

      提示:在管理界面可以更直观的看到集群信息

    4. 10.10.1.43加入到集群

      # 停止 RabbitMQ 应用
      rabbitmqctl stop_app
      # 重置 RabbitMQ 设置
      rabbitmqctl reset
      # 节点加入到集群
      rabbitmqctl join_cluster rabbit@node1 --ram
      # 启动 RabbitMQ 应用
      rabbitmqctl start_app
      
    5. 重复地3步,查看集群状态


    单机多节点部署

    一、环境准备

    • 准备一台已经安装好RabbitMQ 的机器,安装方法见 安装步骤
      • 10.10.1.41

    二、启动RabbitMQ

    1. 在启动前,先修改RabbitMQ 的默认节点名(非必要),在/etc/rabbitmq/rabbitmq-env.conf增加以下内容

      # RabbitMQ 默认节点名,默认是rabbit
      NODENAME=rabbit1
      
    2. RabbitMQ 默认是使用服务的启动的,单机多节点时需要改为手动启动,先停止运行中的RabbitMQ 服务

      sudo systemctl stop rabbitmq-server
      
    3. 启动第一个节点

      rabbitmq-server -detached
      
    4. 启动第二个节点

      RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
      
    5. 启动第三个节点

      RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached
      
    6. 将rabbit2加入到集群

      # 停止 rabbit2 的应用
      rabbitmqctl -n rabbit2 stop_app
      # 重置 rabbit2 的设置
      rabbitmqctl -n rabbit2 reset
      # rabbit2 节点加入到 rabbit1的集群中
      rabbitmqctl -n rabbit2 join_cluster rabbit1 --ram
      # 启动 rabbit2 节点
      rabbitmqctl -n rabbit2 start_app
      
    7. 将rabbit3加入到集群

      # 停止 rabbit3 的应用
      rabbitmqctl -n rabbit3 stop_app
      # 重置 rabbit3 的设置
      rabbitmqctl -n rabbit3 reset
      # rabbit3 节点加入到 rabbit1的集群中
      rabbitmqctl -n rabbit3 join_cluster rabbit1 --ram
      # 启动 rabbit3 节点
      rabbitmqctl -n rabbit3 start_app
      
    8. 查看集群状态,看到{running_nodes,[rabbit3@node1,rabbit2@node1,rabbit1@node1]}说明节点已启动成功。

      rabbitmqctl cluster_status
      

      提示:在管理界面可以更直观的看到集群信息

    三、防火墙添加端口

    • 需要将每个节点的端口都添加到防火墙
    1. 添加端口

      sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=5673/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=25673/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=15673/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=5674/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=25674/tcp --permanent
      sudo firewall-cmd --zone=public --add-port=15674/tcp --permanent
      
    2. 重启防火墙

      sudo firewall-cmd --reload
      

    镜像队列模式集群

    • 镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments

    • 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群

    • 开启镜像队列模式需要在管理页面添加策略,添加方式:

      1. 进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy

      2. 在表单中填入:

              name: ha-all
           Pattern: ^
          Apply to: Queues
          Priority: 0
        Definition: ha-mode = all
        

        参数说明

        name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息

        Apply to:策略应用到什么对象上

        Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式)

        Priority:优先级,数值越大,优先级越高,相同优先级取最后一个

        Definition:策略定义的类容,对于镜像队列的配置来说,只需要包含3个部分: ha-modeha-paramsha-sync-mode。其中,ha-sync-mode是同步的方式,自动还是手动,默认是自动。ha-modeha-params 组合使用。组合方式如下:

      ha-mode ha-params 说明
      all (empty) 队列镜像到集群类所有节点
      exactly count 队列镜像到集群内指定数量的节点。如果集群内节点数少于此值,队列将会镜像到所有节点。如果大于此值,而且一个包含镜像的节点停止,则新的镜像不会在其它节点创建。
      nodes nodename 队列镜像到指定节点,指定的节点不在集群中不会报错。当队列申明时,如果指定的节点不在线,则队列会被创建在客户端所连接的节点上。
    • 镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式

    • 但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。


    简单代码示例

    JAVA依赖

      <dependency>
           <groupId>com.rabbitmq</groupId>
           <artifactId>amqp-client</artifactId>
           <version>5.5.1</version>
      </dependency>
    

    Spring中依赖

      <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
           <version>2.1.1.RELEASE</version>
      </dependency>
      <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
           <version>2.1.1.RELEASE</version>
      </dependency>
    
    
    生产者
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 简单队列生产者
     * 使用RabbitMQ的默认交换器发送消息
     */
    public class Producer {
    
        public static void main(String[] args) {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("192.168.100.242");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection("生产者");
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
    
                /**
                 * 5、声明(创建)队列
                 * 如果队列不存在,才会创建
                 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
                 *
                 * queueDeclare参数说明:
                 * @param queue 队列名称
                 * @param durable 队列是否持久化
                 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
                 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
                 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
                 */
                channel.queueDeclare("queue1", false, false, false, null);
    
                // 消息内容
                String message = "Hello World!";
                // 6、发送消息
                channel.basicPublish("", "queue1", null, message.getBytes());
                System.out.println("消息已发送!");
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 7、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                // 8、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    
    消费者
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 简单队列消费者
     */
    public class Consumer {
    
        public static void main(String[] args) {
            // 1、创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 2、设置连接属性
            factory.setHost("192.168.100.242");
            factory.setUsername("admin");
            factory.setPassword("admin");
    
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 3、从连接工厂获取连接
                connection = factory.newConnection("消费者");
    
                // 4、从链接中创建通道
                channel = connection.createChannel();
    
                /**
                 * 5、声明(创建)队列
                 * 如果队列不存在,才会创建
                 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
                 *
                 * queueDeclare参数说明:
                 * @param queue 队列名称
                 * @param durable 队列是否持久化
                 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
                 *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
                 *                  一般在队列和交换器绑定时使用
                 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
                 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
                 */
                channel.queueDeclare("queue1", false, false, false, null);
    
                // 6、定义收到消息后的回调
                DeliverCallback callback = new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                    }
                };
                // 7、监听队列
                channel.basicConsume("queue1", true, callback, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                    }
                });
    
                System.out.println("开始接收消息");
                System.in.read();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 8、关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                // 9、关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    

    Spring中使用RabbitMQ

    创建队列
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class AppConfiguration {
    
        @Bean
        public Queue helloSpring() {
            return new Queue("spring.cluster");
        }
    }
    
    
    生产者
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Import;
    
    @Configuration
    @EnableAutoConfiguration
    @Import(AppConfiguration.class)
    public class ProducerApp {
    
        private static final Logger logger = LoggerFactory.getLogger(ProducerApp.class);
    
        @Autowired
        private RabbitTemplate template;
    
        @Autowired
        private Queue helloSpring;
    
        @Bean
        CommandLineRunner runner() {
            return args -> {
                template.convertAndSend(helloSpring.getName(), "Hello Spring");
                logger.info("消息已发送");
            };
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ProducerApp.class, args);
        }
    
    }
    
    
    消费者
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Import;
    import org.springframework.messaging.handler.annotation.Payload;
    
    @Configuration
    @EnableAutoConfiguration
    @RabbitListener(queues = "spring.cluster")
    @Import(AppConfiguration.class)
    public class ConsumerApp {
        private static final Logger logger = LoggerFactory.getLogger(ConsumerApp.class);
    
        @RabbitHandler
        public void receive(@Payload String msg) {
            logger.info("收到消息:" + msg);
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApp.class, args);
        }
    }
    
    

    相关文章

      网友评论

        本文标题:RabbitMQ入门使用

        本文链接:https://www.haomeiwen.com/subject/scrfkhtx.html