美文网首页
RabbitMQ轮询分发和公平分发

RabbitMQ轮询分发和公平分发

作者: 裂开的汤圆 | 来源:发表于2020-11-21 20:51 被阅读0次

    轮询分发

    先看代码,生产者生产十条消息。开启两个消费者,组成工作队列,消费者1消费完一条消息后将线程挂起1秒,消费者2消费完一条消息后将线程挂起两秒。

    生产者:

    public class Send {
        private static final String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("sending...");
            for(int i=0; i < 10; i++){
                String msg = "第" + i + "条消息";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    public class Rec1 {
        private static final String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 接收到消息后的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
                // 线程挂起1秒
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
    
            // 监听队列,每当队列中接收到新消息后会触发回调函数
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    

    消费者2:

    public class Rec2 {
        private static final String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 接收到消息后的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
                // 线程挂起两秒
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
    
            // 监听队列,每当队列中接收到新消息后会触发回调函数
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    
        }
    }
    

    轮询分发运行结果

    消费者1消费情况 消费者2消费情况

    思考

    根据上述截图我们可以发现,轮询分发不会根据消费者的消费情况进行分发,永远都是一人一条的分发。但这样带来的问题在于,浪费了强消费者的性能(可以看到上述截图中,消费者1在12:03:51秒就已经消费完毕了,后面处于空闲状态,而消费者2在12:05:55秒才消费完毕,消费者1闲置了51s-55s大约四秒钟的时间)。

    发生轮询分发的原因在于RabbitMQ在消息进入队列时才调度消息。它不会查看使用者的未确认消息数。它只是盲目的将第N条消息发送给第N个使用者。

    解决上述问题可以采用以下语句。这告诉RabbitMQ一次只给消费者一条消息,并且在消费者处理并确认该消息后,再将新消息发送给消费者。

    channel.basicQos(1);
    

    公平分发消费者代码

    public class Rec1{
        private static final String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取连接
            Connection connection = ConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            // 一次仅接受一条未经确认的消息,
            channel.basicQos(1);
    
            // 接收到消息后的回调函数
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                // 返回确认消息给rabbitmq
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
    
            // 关闭自动消息确认
            // 消费者发送回确认,以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
            // 如果消费者在不发送确认的情况下挂掉,RabbitMQ将了解消息未得到充分处理,并将重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。
            boolean autoAck = false;
            // 监听队列,每当队列中接收到新消息后会触发回调函数
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
        }
    }
    

    采用公平分发后的执行结果

    消费者1消费截图 消费者2消费截图

    可以看到现在就不会出现一个消费者工作,一个消费者闲置的问题

    相关文章

      网友评论

          本文标题:RabbitMQ轮询分发和公平分发

          本文链接:https://www.haomeiwen.com/subject/fhczqktx.html