美文网首页
RabbitMQ 镜像队列

RabbitMQ 镜像队列

作者: PC_Repair | 来源:发表于2019-06-14 16:32 被阅读0次

镜像队列的使用需要在集群的基础上操作,Rabbitmq 集群的搭建参考如下:

https://www.jianshu.com/p/7cf2ad01c422

镜像队列的基本概念与介绍

队列进程及其内容仅仅维持在单个节点之上,所以一个节点的失效表现为其对应的队列不可用。

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上以保证服务的可用性

在通常的用法中,针对每个队列的(以下简称镜像队列)都包含一个主节点(master)和若干个从节点(slave),相应结构参考图如下:

主从结构.PNG

slave 会准确地按照 master 执行命令地顺序进行动作,故 slave 和 master 上维护的状态应该是相同的。如果 master 由于某种原因失效,那么“资历最老”(根基于 slave 加入 cluster 的时间排序)的 slave 会被提升为新的 master。发送到镜像队列的所有消息会被同时发往 master 和所有的 slave 上,如果此时 master 挂掉了,消息还会在 slave 上,这样 slave 提升为 master 的时候消息也不会丢失。

除发送消息(Basic.Publish)外的所有动作都只会向 master 发送,然后再由master 将命令执行的结果广播给各个 slave。

如果消费者与 slave 建立连接并进行订阅消费,其实质都是从 master 上获取消息,只不过看似是从 slave 上消费而已。比如消费者与 slave 建立了 TCP 连接之后执行一个 Basic.Get 操作,那么首先是由 slave 将Basic.Get 请求发往 master,再由 master 准备好数据返回给 slave,最后由 slave 投递给消费者。

下图结构中,集群中的每个 Broker 节点都包含 1 个队列的 master 和 2 个队列的 slave, Q1 的负载大多都在 broker1 上,Q2 的负载大多都集中在 broker2 上,Q3 的负载大多都集中在 broker3 上,只要确保队列的 master 节点均匀散落在集群中的各个 Broker 节点即可确保很大程度的负载均衡。

集群结构.PNG

注意要点:RabbitMQ 的镜像队列同时支持 publisher confirm 和 事务两种机制。

当 slave 挂掉之后,除了与 slave 相连的客户端连接全部断开,没有其他影响。当 master 挂掉之后,会有以下连锁反应:

(1)与 master 连接的客户端连接全部断开。

(2)选举最老的 slave 作为新的 master,因为最老的 slave 与旧的 master 之间的同步状态应该是最好的。如果此时所有 slave 处于未同步状态,则未同步的消息会丢失。

(3)新的 master 重新入队所有 unack 的消息,因为新的 slave 无法区分这些 unack 的消息是否己经到达客户端,或者是 ack 信息丢失在老的 master 链路上,再或者是丢失在老的 master 组播 ack 消息到所有slave 的链路上,所以出于消息可靠性的考虑,重新入队所有unack 的消息,不过此时客户端可能会有重复消息

(4)如果客户端连接着 slave,并且 Basic.Consume 消费时指定了 x-cancel-on-ha-failover 参数,那么断开之时客户端会收到一个 Consumer Cancellation Notification 的通知,消费者客户端中会回调 Consumer 接口的 handleCancel 方法。如果未指定 x-cancel-on-ha-failover 参数,那么消费者将无法感知 master 宕机。

镜像队列的配置主要是通过添加相应的 Policy 来完成的,对于镜像队列的配置来说,definition 中需要包含 3 个部分:ha-modeha-paramsha-sync-mode

  • ha-mode:指明镜像队列的模式,有效值为all 、exactly 、nodes ,默认为all 。all 表示在集群中所有的节点上进行镜像; exactly 表示在指定个数的节点上进行镜像,节点个数由ha - params 指定; nodes 表示在指定节点上进行镜像,节点名称通过 ha-params 指定,节点的名称通常类似于 rabbit@hostname ,可以通过rabbitmqctl cluster status 命令查看到。

  • ha-params:不同的 ha-mode 配置中需要用到的参数。

  • ha-sync-mode:队列中消息的同步方式,有效值为 automatic 和 manual。

rabbitmqctl list_queues {name} slave_pids_synchronised_slave_pids :命令可以查看那些 slave 已经完成同步。

rabbitmqctl sync_queue {name}:手动方式同步一个队列。

rabbitmqctl cancel_sync_queue {name}:取消某个队列的同步操作。

当所有slave 都出现未同步状态,并且 ha-prornote- on -shutdown 设置为 when-syn ced(默认)时,如果master 因为主动原因停掉,比如通过 rabbitrnqctl stop 命令或者优雅关闭操作系统,那么slave 不会接管master,也就是此时镜像队列不可用:但是如果master 因为被动原因停掉,比如Erlang 虚拟机或者操作系统崩溃,那么slave 会接管master。这个配置项隐含的价值取向是保证消息可靠不丢失,同时放弃了可用性。如果 ha-prornote-on-shutdown 设置为always ,那么不论master 因为何种原因停止, slave 都会接管 master ,优先保证可用性,不过消息可能会丢失。

镜像队列中最后一个停止的节点会是 master启动顺序必须是 master 先启动。如果 slave 先启动,它会有30 秒的等待时间,等待 master 的启动,然后加入到集群中。如果30 秒内 master没有启动, slave 会自动停止。当所有节点因故(断电等)同时离线时,每个节点都认为自己不是最后一个停止的节点,要恢复镜像队列,可以尝试在30 秒内启动所有节点。

镜像队列实例

此处实例的操作都是在 Rabbitmq Web 管理界面操作的,所以请安装好 rabbitmq management 插件。
添加关于镜像队列的策略(policy):

mirror_queue_policy.PNG

解释:

  • Virtual host

  • Pattern:正则匹配,定义的 policy 会根据正则表达式应用到相应的交换器或者队列上,此处设置只匹配名为test.mirror 的队列。

  • Definition:配置策略的参数,ha-mode=all 表示镜像队列将会在整个集群中复制,当一个新的节点加入后,也会在这 个节点上复制一份。其他参考如下:

ha-mode ha-params 功能
all 镜像队列将会在整个集群中复制。当一个新的节点加入后,也会在这 个节点上复制一份。
exactly count 镜像队列将会在集群上复制 count 份。如果集群数量少于 count 时候,队列会复制到所有节点上。如果大于 Count 集群,有一个节点 crash 后,新进入节点也不会做新的镜像。
nodes node name 镜像队列会在 node name 中复制。如果这个名称不是集群中的一个,这不会触发错误。如果在这个 node list 中没有一个节点在线,那么这个 queue 会被声明在 client 连接的节点。
  • 策略添加之后,若匹配到相应的队列则会为队列添加该策略。
mirror_queue.PNG

个人测试总结:测试时建立了 node1 和 node2 两个节点作为一个 cluster

  • 场景1:接收消息时 node1,node2 都存活,接收完消息后 node1 宕机

当消费者去消费消息时会从选择一个存活的节点消费消息,当 node1 上线后会和 node2 同步,即 node1 中宕机前存储的消息会消失。

  • 场景2:接收消息时 node1 宕机,node2 存活,消息接收完之后 node1 上线

node1 中不会同步 node2 中存的消息,管理界面会出现以下异常显示。

mirror_queue场景2.PNG

此时当 node2 宕机,node1 存活时,消费者无法消费到消息。当 node1、node2 都存活时,消费者能正常消费消息,当未同步的消息消费完之后,镜像队列就变成同步状态。

测试代码

  • RabbitMQSender
public class RabbitMQSender {

    private final static String QUEUE_NAME = "test.mirror";
    private final static String DUHFMQ01 = "10.225.20.210";  //输入自己节点的 ip
    private final static String DUHFMQ02 = "10.225.20.211";

    public static void main(String[] args) {

        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");
        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01), new Address(DUHFMQ02)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message;

            for (int i = 0; i < 10; i++) {
                message = System.currentTimeMillis() + " hello " + i;
                System.out.println(i + " send " + message);
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                Thread.sleep(30);
            }

            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • RabbitMQReceiver
public class RabbitMQReceiver {
    private final static String QUEUE_NAME = "test.mirror";
    private final static String DUHFMQ01 = "10.225.20.210";
    private final static String DUHFMQ02 = "10.225.20.211";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(2);

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01), new Address(DUHFMQ02)};
            //Address[] address = new Address[]{new Address(DUHFMQ02)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Queue Receiver Start!");

            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("Recv msg: " + msg);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            boolean autoAck = true;
            while (true) {
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        }
    }
}

相关文章

网友评论

      本文标题:RabbitMQ 镜像队列

      本文链接:https://www.haomeiwen.com/subject/pvypfctx.html