美文网首页
RabbitMQ六种队列模式-工作队列模式

RabbitMQ六种队列模式-工作队列模式

作者: 呆叔么么 | 来源:发表于2020-01-16 10:45 被阅读0次

    上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。

    但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。


    work

    1. 什么是工作队列

    工作队列:用来将耗时的任务分发给多个消费者

    主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

    2. 代码部分

    生产者

    package cn.lovingliu.rabbitmq_work.producer;
    
    import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者
     * @Date:Created in 2020-01-16
     */
    public class Producer {
        /** 队列名称 */
        private static final String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.创建通道 */
            Channel channel = newConnection.createChannel();
            /** 3.创建队列声明 */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /** 在收到消费者确认回执消息之前 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 避免重复消费 */
            channel.basicQos(1);
            for (int i = 1; i <= 100; i++) {
                String msg = "生产者消息_" + i;
                System.out.println("生产者发送消息:" + msg);
                /** 4.发送消息 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            newConnection.close();
        }
    }
    

    消费者1

    package cn.lovingliu.rabbitmq_work.consumer;
    
    import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者1
     * @Date:Created in 2020-01-16
     */
    public class Customer_1 {
        /**
         * 队列名称
         */
        private static final String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("001");
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.获取通道 */
            final Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(1000);// 阻塞事件更长,代表该消息处理事件更长
                    } catch (Exception e) {
    
                    } finally {
                        /** 手动回执消息 */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            /** 3.监听队列 */
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    }
    

    消费者2

    package cn.lovingliu.rabbitmq_work.consumer;
    
    import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author:LovingLiu
     * @Description: 消费者1
     * @Date:Created in 2020-01-16
     */
    public class Customer_2 {
        /**
         * 队列名称
         */
        private static final String QUEUE_NAME = "test_queue_work";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("002");
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.获取通道 */
            final Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {
    
                    } finally {
                        /** 手动回执消息 */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            /** 3.监听队列 */
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    }
    

    3.测试结果

    生产者生产消息
    consumer1
    consumer2

    4.分发机制

    • 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

    • 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

    5.实现公平分发

    由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。注意:要实现公平分发,必须关闭自动应答
    公平分发
    其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP中的UDP,面向无连接。

    因此我们可以使用 basicQos 方法,并将参数 prefetchCount设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

    关键性代码:

    /** 2.获取通道 */
    final Channel channel = newConnection.createChannel();
    /** 设置为手动应答
    */
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
    channel.basicQos(1);
    

    6.消息确认模式

    消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

    模式1:自动确认
    只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
    模式2:手动确认
    消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并在其他消费者空闲时,发给空闲消费者。
    手动模式:

    手动确认
    自动模式:
    自动确认

    7.消息持久化

    问题背景
    上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

    当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

    这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

    但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

    怎么办呢?
    参数配置

    • 参数配置一:生产者创建队列声明时,修改第二个参数为 true
    /**3.创建队列声明 */
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    • 参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN
    for (int i = 1; i <= 50; i++) {
        String msg = "生产者消息_" + i;
        System.out.println("生产者发送消息:" + msg);
        /**4.发送消息 */
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    }
    

    实现持久化

    package cn.lovingliu.rabbitmq.producer;
    
    
    import cn.lovingliu.rabbitmq.utils.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者发送消息
     * @Date:Created in 2020-01-15
     */
    public class Send {
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws Exception {
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.创建通道 */
            Channel channel = newConnection.createChannel();
            /** 3.创建队列声明 */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化
            String msg = "我是生产者生成的消息";
            System.out.println("生产者发送消息:" + msg);
            /** 4.发送消息 */
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化
            channel.close();
            newConnection.close();
        }
    }
    

    8. 工作队列总结

    1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

    2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会。

    3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

    相关文章

      网友评论

          本文标题:RabbitMQ六种队列模式-工作队列模式

          本文链接:https://www.haomeiwen.com/subject/odexzctx.html