美文网首页
RabbitMQ Shovel

RabbitMQ Shovel

作者: PC_Repair | 来源:发表于2019-06-19 15:18 被阅读0次

与 Federation 具备的数据转发功能类似, Shovel 能够可靠、持续地从一个Broker 中的队列(作为源端,即source )拉取数据并转发至另一个Broker 中的交换器(作为目的端,即destination )。
作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。

Shovel 的原理

Shovel的结构.PNG

==通常情况下,使用 Shovel 时配置队列作为源端,交换器作为目的端==,如上图所示。

Shovel 的使用

Shovel 插件默认也在 RabbitMQ 的发布包中,执行 rabbitmq-plugins enable rabbitmq_shovel命令可以开启 Shovel 功能。

开启 rabbitmq shovel management 插件之后, 在 RabbitMQ 的管理界面中" Admin "的右侧会多出"Shovel Status" 和" Shovel Management " 两个 Tab 页。rabbitmq-plugins enable rabbitmq_shovel_management

开启shovel_management.PNG

Shovel 既可以部署在源端,也可以部署在目的端。有两种方式可以部署 Shovel:

  • 静态方式:在 rabbitmq.config 配置文件中设置
  • 动态方式:通过 Runtime Parameter 设置

RabbitMQ Web 管理界面配置:

add_shovel.PNG

注意:URI 的 写法,%2f 用来转义

amqp://username:passwd@IP:端口/%2fVhost

测试代码:

  • RabbitMQSender
public class RabbitMQSender {
    private final static String QUEUE_NAME = "shovel.queue1";
    private final static String DUHFMQ01 = "10.224.162.189";
    
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message;

            for (int i = 0; i < 10; i++) {
                message = System.currentTimeMillis() + " hello " + i;
                System.out.println(i + " send " + message);
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                Thread.sleep(30);
            }
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • RabbitMQReceiver
public class RabbitMQReceiver {
    private final static String QUEUE_NAME = "shovel.queue1";
    private final static String EXCHANGE_NAME = "shovel.test.exchange1";
    private final static String ROUTING_KEY = "shovel.test.exchange";
    private final static String DUHFMQ01 = "10.225.20.237";
    private final static String DUHFMQ02 = "10.225.20.231";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(2);

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01), new Address(DUHFMQ02)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            // queue
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
            // bind
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            System.out.println("Queue Receiver Start!");

            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("Recv msg: " + msg);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            boolean autoAck = true;
            while (true) {
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        }
    }
}

配置双向 shovel 测试:

  • 数据在两端之间来会发送
  • 当消费时,可以正常消费完,不出现冗余

总结

  • 在使用 shovel 插件时,只需要在 source 节点进行配置,destination 节点不需要配置;同理,只需要在 source 节点上使能 shovel 插件,destination 节点无需使能该插件;
  • 在 shovel 正常工作时,对于 source 节点来说,增加了一条用于 consumer 的 TCP 连接;对于 destination 节点来说,增加了一条用于 producer 的 TCP 连接,和普通客户端的连接行为没什么不同;

案例:消息堆积的治理

当某个队列中的消息堆积严重时,比如超过某个设定的阈值,就可以通过 Shovel 将队列中的消息移交给另一个集群。

消息堆积的治理.PNG
  • 情形 1:当检测到当前运行集群 cluster1 中的队列 queue1 中有严重消息堆积,比如通过/api/queues/vhost/ name 接口获取到队列的消息个数(messages) 超过2 千万或者消息占用大小(messages bytes) 超过10GB 时,就启用 shovel1 将队列 queue1 中的消息转发至备份集群 cluster2 中的队列queue2 。

  • 情形 2 :紧随情形1,当检测到队列queue1 中的消息个数低于1 百万或者消息占用大小低于1GB 时就停止shovel1 ,然后让原本队列 queue1 中的消费者慢慢处理剩余的堆积。

  • 情形 3:当检测到队列 queue1 中的消息个数低于10 万或者消息占用大小低于100MB时,就开启 shovel2 将队列 queue2 中暂存的消息返还给队列queue1 。

  • 情形 4:紧随情形3 ,当检测到队列queuel 中的消息个数超过 1百万或者消息占用大小高于1GB 时就将shovel2 停掉。

相关文章

网友评论

      本文标题:RabbitMQ Shovel

      本文链接:https://www.haomeiwen.com/subject/vgdkqctx.html