美文网首页MQ
07-RabbitMQ持久化

07-RabbitMQ持久化

作者: 紫荆秋雪_文 | 来源:发表于2021-09-24 16:08 被阅读0次

    一、概念

    手动应答解决的是任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非预先设定不这样做。确保消息不会丢失需要做两件事:“队列和消息标记为持久化”。

    二、队列持久化

    1、队列持久化代码设置

            /**
             * 创建一个队里
             *
             * 队列名称
             * 队列是否持久化
             * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
             * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    

    2、重启生成者

    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
    
    • 队列修改参数后,需要删除掉已经存在的同名的旧队列,否则报上述错误

    3、生成消息

    /**
     * 消息生产者
     */
    public class Producer {
    
        // 队列名称
        private static final String QUEUE_NAME = "ack_queue";
    
    
        public static void main(String[] args) throws Exception {
    
            // 创建一个Channel
            Channel channel = ConnectionUtils.getConnection().createChannel();
    
            /**
             * 创建一个队里
             *
             * 队列名称
             * 队列是否持久化  true:持久化
             * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
             * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                //准备消息
                String msg = scanner.next();
    
                /**
                 * 发布消息
                 *
                 * 发送到那个交换机
                 * 路由key
                 * 其他参数
                 * 发送消息的消息体
                 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("发送消息完成:" + msg);
            }
        }
    
    }
    
    • 生成消息
    11
    发送消息完成:11
    22
    发送消息完成:22
    33
    发送消息完成:33
    44
    发送消息完成:44
    55
    发送消息完成:55
    
    • 队列 image.png
    • 现在消息队列中的消息


      image.png
    • 重启RabbitMQ image.png
    • 队列依然在 image.png
    • 消息队列中消息 image.png

    小结:队列持久化成功,但是要想队列中消息也存在需要消息也要持久化

    三、消息持久化

    1、消息生产者代码设置

     /**
       * 发布消息
       *
       * 发送到那个交换机
       * 路由key
       * 其他参数
       * 发送消息的消息体
       */
      channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    

    2、测试流程与上面队列持久化一样 image.png

    3、重启RabbitMQ服务 image.png

    image.png

    三、不公平分发

    RabbitMQ默认的分发消息使用的是轮询分发,但是在有些场景下是不妥的,向上面的测试代码中在正常情况下发送的消息11、22、33、44、55、66,Consumer1和Consumer2会平分消费上面的消息,但是由于Consumer2中处理速度低,这就造成Consumer1处理完消息空闲等待,而Consumer2还没有去不执行完消息;这就浪费了Consumer1的处理能。所以需要“不公平分发”,让“能者多劳”。

    1、在消费端代码设置

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

    2、Producer

    /**
     * 消息生产者
     */
    public class Producer {
    
        // 队列名称
        private static final String QUEUE_NAME = "ack_queue";
    
    
        public static void main(String[] args) throws Exception {
    
            // 创建一个Channel
            Channel channel = ConnectionUtils.getConnection().createChannel();
    
            /**
             * 创建一个队里
             *
             * 队列名称
             * 队列是否持久化  true:持久化
             * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
             * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                //准备消息
                String msg = scanner.next();
    
                /**
                 * 发布消息
                 *
                 * 发送到那个交换机
                 * 路由key
                 * 其他参数
                 * 发送消息的消息体
                 */
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
                System.out.println("发送消息完成:" + msg);
            }
        }
    
    }
    

    3、Consumer1

    /**
     * 接收消息
     */
    public class Consumer1 {
    
        // 队列名称
        private static final String QUEUE_NAME = "ack_queue";
    
        public static void main(String[] args) throws Exception {
            // 创建一个Channel
            Channel channel = ConnectionUtils.getConnection().createChannel();
    
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            System.out.println("C1等待接收消息。。。。。");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                try {
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String msg = new String(message.getBody());
                System.out.println(msg);
                // 单个应答
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
    
            // 取消消息回调
            CancelCallback cancelCallback = (String consumerTag) -> {
                System.out.println("消息消费被中断。。。");
            };
    
            /**
             * 消费消息
             *
             * 消息队列
             * 消费成功之后是否要自动应答
             * 消费成功/失败回调
             */
            //标记消息手动应答
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
        }
    }
    

    4、Consumer2

    /**
     * 接收消息
     */
    public class Consumer2 {
    
        // 队列名称
        private static final String QUEUE_NAME = "ack_queue";
    
        public static void main(String[] args) throws Exception {
            // 创建一个Channel
            Channel channel = ConnectionUtils.getConnection().createChannel();
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            System.out.println("C2等待接收消息。。。。。");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String msg = new String(message.getBody());
                System.out.println(msg);
                // 单个应答
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
    
            // 取消消息回调
            CancelCallback cancelCallback = (String consumerTag) -> {
                System.out.println("消息消费被中断。。。");
            };
    
            /**
             * 消费消息
             *
             * 消息队列
             * 消费成功之后是否要自动应答
             * 消费成功/失败回调
             */
            //标记消息手动应答
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
        }
    }
    

    5、消息生成

    11
    发送消息完成:11
    22
    发送消息完成:22
    33
    发送消息完成:33
    44
    发送消息完成:44
    55
    发送消息完成:55
    66
    发送消息完成:66
    

    6、Consumer1消费情况

    22
    

    7、Consumer2消费情况

    11
    33
    44
    55
    66
    

    相关文章

      网友评论

        本文标题:07-RabbitMQ持久化

        本文链接:https://www.haomeiwen.com/subject/nxwvultx.html