美文网首页工具推荐
RabbitMQ入门使用

RabbitMQ入门使用

作者: 依弗布德甘 | 来源:发表于2020-03-01 13:25 被阅读0次

RabbitMQ

  • RabbitMQ主要基于AMQP协议实现
    AMQP (Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
RabbitMQ模型架构
  • Producer

    生产者,投递消息的一方。用于创建消息,然后发布到RabbitMQ中
    消息一般分为两个部分:消息体 、附加信息

    • 消息体一般是一个带有业务逻辑结构的数据,比如JSON字符串
    • 附加信息用来表述这条消息,如交换器名称、路由键和一些自定义属性等等
  • Broker

    消息中间件的服务节点;单台机器部署Broker就相当于是整个MQ服务器

  • Virtual Host

    虚拟主机,表示一批交换器、消息队列和相关对象;虚拟主机是共享相同身份认证和加密环境的独立服务器域

  • Channel

    频道或信道,是建立在Connection连接之上的一种轻量级的连接;一个Connection可以创建任意数量的Channel

大部分操作都是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等

  • RoutingKey

    路由键;生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则;RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用

  • Exchange

    交换器,生产者将消费发送到Exchange,再由它将消息路由到一个或多个队列中,如果路由不到,或返回或直接丢弃

    • fanout:扇形交换机,会把所有消息路由到与之绑定的所有队列中
    • direct:直连交换机,会根据BindingKey与RoutingKey匹配发送消息
    • topic:主题交换机,与direct类似,但是可以通过通配符模糊匹配
    • headers:头交换机,根据消息头部中带的值进行匹配
  • Queue

    队列,是RabbitMQ的内部对象,用于存储消息

  • Binding

    绑定,RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样交换器就知道如何正确的将消息路由到哪个队列中

  • Consumer

    消费者,接受消息的一方;消费者连接到RabbitMQ服务器并定于到队列上

RabbitMQ运转流程

业务运转流程 架构运转流程
  • 生产者发送消息的过程:
    1. 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
    2. 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
    3. 生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等
    4. 生产者通过路由键将交换器和队列绑定起来
    5. 生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
    6. 相应的交换器根据接受到的路由键排查相匹配的队列
    7. 如果找到,则将从生产者发送过来的消息存入相应的队列中
    8. 如果没找到,则根据生产者配置的属性选择丢弃还是回退给生产者
    9. 关闭信道,关闭连接
  • 消费者接收消息的过程:
    1. 消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)
    2. 消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
    3. 等待RabbitMQ Broker 回应并投递相应队列中的消息,接收消息
    4. 消费者确认(ack)接收到的消息
    5. RabbitMQ 从队列中删除相应已被确认的消息
    6. 关闭信道、关闭连接


RabbitMQ 安装和使用

一、安装依赖环境

  1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本

  2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。

  3. 复制下载地址后,使用wget命令下载

    wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  4. 安装 Erlang

    sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  5. 安装 socat

    sudo yum install -y socat
    

二、安装RabbitMQ

  1. 官方下载页面找到CentOS7版本的下载链接,下载rpm安装包

    wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

    提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

  2. 安装RabbitMQ

    sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

三、启动和关闭

  • 启动服务

    sudo systemctl start rabbitmq-server
    
  • 查看状态

    sudo systemctl status rabbitmq-server
    
  • 停止服务

    sudo systemctl stop rabbitmq-server
    
  • 设置开机启动

    sudo systemctl enable rabbitmq-server
    

四、开启Web管理插件

  1. 开启插件

    rabbitmq-plugins enable rabbitmq_management
    

    说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

  2. 添加用户

    rabbitmqctl add_user admin admin
    
  3. 为用户分配操作权限

    rabbitmqctl set_user_tags admin administrator
    
  4. 为用户分配资源权限

    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    

五、防火墙添加端口

  • RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙
  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

多机多节点集群部署

一、 环境准备

  • 准备三台安装好RabbitMQ 的机器,安装方法见 安装步骤

    • 10.10.1.41
    • 10.10.1.42
    • 10.10.1.43

    提示:如果使用虚拟机,可以在一台VM上安装好RabbitMQ后,创建快照,从快照创建链接克隆,会节省很多磁盘空间

二、修改配置文件

  1. 修改10.10.1.41机器上的/etc/hosts文件

    sudo vim /etc/hosts
    
  2. 添加IP和节点名

    10.10.1.41 node1
    10.10.1.42 node2
    10.10.1.43 node3
    
  3. 修改对应主机的hostname

hostname node1
hostname node2
hostname node3
  1. 10.10.1.41上的hosts文件复制到另外两台机器上
    sudo scp /etc/hosts root@node2:/etc/
    sudo scp /etc/hosts root@node3:/etc/
    
    说明:命令中的root是目标机器的用户名,命令执行后,可能会提示需要输入密码,输入对应用户的密码就行了
  2. 10.10.1.41上的/var/lib/rabbitmq/.erlang.cookie文件复制到另外两台机器上
    scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
    scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
    
    提示:如果是通过克隆的VM,可以省略这一步

三、防火墙添加端口

  • 给每台机器的防火墙添加端口
  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

四、启动RabbitMQ

  1. 启动每台机器的RabbitMQ

    sudo systemctl start rabbitmq-server
    

    或者

    rabbitmq-server -detached
    
  2. 10.10.1.42加入到集群

    # 停止RabbitMQ 应用
    rabbitmqctl stop_app
    # 重置RabbitMQ 设置
    rabbitmqctl reset
    # 加入到集群
    rabbitmqctl join_cluster rabbit@node1 --ram
    # 启动RabbitMQ 应用
    rabbitmqctl start_app
    
  3. 查看集群状态,看到running_nodes,[rabbit@node1,rabbit@node2]表示节点启动成功

    rabbitmqctl cluster_status
    

    提示:在管理界面可以更直观的看到集群信息

  4. 10.10.1.43加入到集群

    # 停止 RabbitMQ 应用
    rabbitmqctl stop_app
    # 重置 RabbitMQ 设置
    rabbitmqctl reset
    # 节点加入到集群
    rabbitmqctl join_cluster rabbit@node1 --ram
    # 启动 RabbitMQ 应用
    rabbitmqctl start_app
    
  5. 重复地3步,查看集群状态


单机多节点部署

一、环境准备

  • 准备一台已经安装好RabbitMQ 的机器,安装方法见 安装步骤
    • 10.10.1.41

二、启动RabbitMQ

  1. 在启动前,先修改RabbitMQ 的默认节点名(非必要),在/etc/rabbitmq/rabbitmq-env.conf增加以下内容

    # RabbitMQ 默认节点名,默认是rabbit
    NODENAME=rabbit1
    
  2. RabbitMQ 默认是使用服务的启动的,单机多节点时需要改为手动启动,先停止运行中的RabbitMQ 服务

    sudo systemctl stop rabbitmq-server
    
  3. 启动第一个节点

    rabbitmq-server -detached
    
  4. 启动第二个节点

    RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
    
  5. 启动第三个节点

    RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached
    
  6. 将rabbit2加入到集群

    # 停止 rabbit2 的应用
    rabbitmqctl -n rabbit2 stop_app
    # 重置 rabbit2 的设置
    rabbitmqctl -n rabbit2 reset
    # rabbit2 节点加入到 rabbit1的集群中
    rabbitmqctl -n rabbit2 join_cluster rabbit1 --ram
    # 启动 rabbit2 节点
    rabbitmqctl -n rabbit2 start_app
    
  7. 将rabbit3加入到集群

    # 停止 rabbit3 的应用
    rabbitmqctl -n rabbit3 stop_app
    # 重置 rabbit3 的设置
    rabbitmqctl -n rabbit3 reset
    # rabbit3 节点加入到 rabbit1的集群中
    rabbitmqctl -n rabbit3 join_cluster rabbit1 --ram
    # 启动 rabbit3 节点
    rabbitmqctl -n rabbit3 start_app
    
  8. 查看集群状态,看到{running_nodes,[rabbit3@node1,rabbit2@node1,rabbit1@node1]}说明节点已启动成功。

    rabbitmqctl cluster_status
    

    提示:在管理界面可以更直观的看到集群信息

三、防火墙添加端口

  • 需要将每个节点的端口都添加到防火墙
  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5674/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25674/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15674/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

镜像队列模式集群

  • 镜像队列属于RabbitMQ 的高可用方案,见:https://www.rabbitmq.com/ha.html#mirroring-arguments

  • 通过前面的步骤搭建的集群属于普通模式集群,是通过共享元数据实现集群

  • 开启镜像队列模式需要在管理页面添加策略,添加方式:

    1. 进入管理页面 -> Admin -> Policies(在页面右侧)-> Add / update a policy

    2. 在表单中填入:

            name: ha-all
         Pattern: ^
        Apply to: Queues
        Priority: 0
      Definition: ha-mode = all
      

      参数说明

      name: 策略名称,如果使用已有的名称,保存后将会修改原来的信息

      Apply to:策略应用到什么对象上

      Pattern:策略应用到对象时,对象名称的匹配规则(正则表达式)

      Priority:优先级,数值越大,优先级越高,相同优先级取最后一个

      Definition:策略定义的类容,对于镜像队列的配置来说,只需要包含3个部分: ha-modeha-paramsha-sync-mode。其中,ha-sync-mode是同步的方式,自动还是手动,默认是自动。ha-modeha-params 组合使用。组合方式如下:

    ha-mode ha-params 说明
    all (empty) 队列镜像到集群类所有节点
    exactly count 队列镜像到集群内指定数量的节点。如果集群内节点数少于此值,队列将会镜像到所有节点。如果大于此值,而且一个包含镜像的节点停止,则新的镜像不会在其它节点创建。
    nodes nodename 队列镜像到指定节点,指定的节点不在集群中不会报错。当队列申明时,如果指定的节点不在线,则队列会被创建在客户端所连接的节点上。
  • 镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会低于普通模式

  • 但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启动后才能被消费。


简单代码示例

JAVA依赖

  <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.5.1</version>
  </dependency>

Spring中依赖

  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
       <version>2.1.1.RELEASE</version>
  </dependency>
  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
       <version>2.1.1.RELEASE</version>
  </dependency>

生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单队列生产者
 * 使用RabbitMQ的默认交换器发送消息
 */
public class Producer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("192.168.100.242");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection("生产者");

            // 4、从链接中创建通道
            channel = connection.createChannel();

            /**
             * 5、声明(创建)队列
             * 如果队列不存在,才会创建
             * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
             * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 消息内容
            String message = "Hello World!";
            // 6、发送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已发送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单队列消费者
 */
public class Consumer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("192.168.100.242");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection("消费者");

            // 4、从链接中创建通道
            channel = connection.createChannel();

            /**
             * 5、声明(创建)队列
             * 如果队列不存在,才会创建
             * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
             *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
             *                  一般在队列和交换器绑定时使用
             * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 6、定义收到消息后的回调
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 7、监听队列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 8、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 9、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Spring中使用RabbitMQ

创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfiguration {

    @Bean
    public Queue helloSpring() {
        return new Queue("spring.cluster");
    }
}

生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import(AppConfiguration.class)
public class ProducerApp {

    private static final Logger logger = LoggerFactory.getLogger(ProducerApp.class);

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Queue helloSpring;

    @Bean
    CommandLineRunner runner() {
        return args -> {
            template.convertAndSend(helloSpring.getName(), "Hello Spring");
            logger.info("消息已发送");
        };
    }

    public static void main(String[] args) {
        SpringApplication.run(ProducerApp.class, args);
    }

}

消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.handler.annotation.Payload;

@Configuration
@EnableAutoConfiguration
@RabbitListener(queues = "spring.cluster")
@Import(AppConfiguration.class)
public class ConsumerApp {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerApp.class);

    @RabbitHandler
    public void receive(@Payload String msg) {
        logger.info("收到消息:" + msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class, args);
    }
}

相关文章

网友评论

    本文标题:RabbitMQ入门使用

    本文链接:https://www.haomeiwen.com/subject/scrfkhtx.html