美文网首页
RabbitMQ-工作队列

RabbitMQ-工作队列

作者: jiahzhon | 来源:发表于2020-07-20 16:28 被阅读0次
  • work queues 工作队列 轮询分发
image.png
  • 为什么会出现工作队列

    • simple队列 是一一对应的,实际开发中,生产者发送消息是毫不费力的,而消费者一般是与业务结合的,消费者接收到消息后就需要处理,可能需要花费时间。这时候队列就会挤压着很多消息。
  • 生产者:

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取channel
        Channel channel = connections.createChannel();
        //
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50; i++) {
            String msg = "workQueue" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("发送的消息" + msg);
            Thread.sleep(i + 20);
        }
        channel.close();
        connections.close();
    }
}
  • 消费者1
public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws Exception {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取通道
        Channel channel = connections.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            // 一旦有消息到达,就触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("消费者1" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1]  done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}
  • 消费者2
public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws Exception {
        // 获取链接
        Connection connections = ConnectionUtils.getConnections();
        // 获取通道
        Channel channel = connections.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义一个消费者

        Consumer consumer = new DefaultConsumer(channel) {
            // 一旦有消息到达,就触发这个方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("消费者2" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2]  done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

现象:消费者1和消费者2处理的数据消息是一样的,
消费者1都是偶数
消费者2都是奇数
这种方式叫 轮询分发(round-robin)结果是 不管谁忙或者谁闲,任务消息都是一边一个轮询发

公平分发

  • 生产者:在消费者返回确认消息之前,只分发一个消息
        /**
         * 每个消费者发送确认消息之前,消息队列不发送下一个消息,一次只处理一个消息
         */
        int prefecthCount=1;
        channel.basicQos(prefecthCount);

  • 消费者
    • 确认每次只收到一个消息
    • 每次处理完消息要返回确认信息
    • 自动应答 关闭
1595235320(1).jpg

现象:消费者1处理得比消费者2多(能者多劳,1模拟暂停时间是1000,2模拟暂停时间是2000)

  • 消息应答消息持久化
    boolean autoAck = false;//自动应答=false
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);

    • boolean autoAck = true;
      • 自动确认模式,一旦rabbitmq将消息分发给消费者,就会从内存中删除。这种情况,如果杀死正在执行任务的消费者,则会丢失正在处理的消息。
    • boolean autoAck = false;
      • 手动模式,如果有一个消费者挂掉,就会交付给其他消费者。rabbitMQ支出消息应答,消费者处理完消息后,给abbitmq发送确认消息,rabbitmq收到后就会删除内存中的消息。
    • 消息应答默认是打开的==>false
  • 消息的持久化

    • boolean autoAck = false确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
      boolean durable=false;//持久化
      channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
    • 我们将程序中的 boolean durable=false 直接改成true是不可以的,因为test_work_queue队列已经定义成未持久化的队列,rabbitmq不允许重新定义已经存在的队列。

相关文章

  • RabbitMQ-工作队列

    work queues 工作队列 轮询分发 为什么会出现工作队列simple队列 是一一对应的,实际开发中,...

  • RabbitMQ-消费消息-basicConsume

    RabbitMQ-消费消息 basicConsume方法 queue 队列名 autoAck 是否自动确认消息,t...

  • SpringBoot整合RabbitMQ1(Direct Exc

    一. 一些基本概念 消息队列 面试官问你什么是消息队列?把这篇甩给他! RabbitMQ RabbitMQ-简书柯...

  • SpringBoot整合RabbitMQ1(Topic Exch

    一. 一些基本概念 消息队列 面试官问你什么是消息队列?把这篇甩给他! RabbitMQ RabbitMQ-简书柯...

  • RabbitMQ-简单队列

    简单队列 添加依赖 P:消息的生产者-->队列-->消费者 连接rabbitmq 发送消息 监听(接收消息) 简单...

  • rabbitMQ-延时队列

    延时队列我们可以简单粗暴的理解它为延时发送消息的队列 那延时队列的应用场景有哪些呢,比如订单在一段时间内未支付则取...

  • rabbitmq-死信队列

    因为可能用到死信队列,所以自己写了一个demo.不知道会不会用到呢... 刚开始认为死信队列是将超时/失败的消息放...

  • RabbitMQ-消息队列

    四种交换器 Virtual hosts 架构模型 RabbitMQ Server: 也叫broker server...

  • RabbitMQ—Android客户端

    工作原理 RabbitMQ-中文文档RabbitMQ-官网上面的网站详细的描述了整个框架。建议没接触过Rabbit...

  • RabbitMQ入门-消息派发那些事儿

    在上篇《RabbitMQ-高效的Work模式》中,我们了解了Work模型,该模型包括一个生产者,一个消息队列和多个...

网友评论

      本文标题:RabbitMQ-工作队列

      本文链接:https://www.haomeiwen.com/subject/jjjykktx.html