美文网首页MQ
05-Work Queues模式

05-Work Queues模式

作者: 紫荆秋雪_文 | 来源:发表于2021-07-06 07:19 被阅读0次
    Work Queues模式.png

    当多个消费者同时监听一个队列时,默认消费者会轮询消费消息

    一、轮询消费

    1、Producer

    /**
     * 消息生产者
     */
    public class Producer {
    
        // 队列名称
        private static final String QUEUE_NAME = "hello_world";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工程
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.16.176.100");
            factory.setUsername("raven");
            factory.setPassword("raven");
    
            // 获取一个连接
            Connection connection = factory.newConnection();
            // 创建一个Channel
            Channel channel = connection.createChannel();
    
            /**
             * 创建一个队里
             *
             * 队列名称
             * 队里里面的消息是否持久化
             * 该队列是否只供一个消费者进行消费,是否进行共享,true 可以多个消费者共享
             * 是否自动删除,最后一个消费者端开连接以后,该队列是否自动删除 true 自动删除
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
    
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                //准备消息
                String msg = scanner.next();
    
                /**
                 * 发布消息
                 *
                 * 发送到那个交换机
                 * 路由key
                 * 其他参数
                 * 发送消息的消息体
                 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("发送消息完成:" + msg);
            }
        }
    
    }
    

    2、Consumer

    /**
     * 接收消息
     */
    public class Consumer {
    
        // 队列名称
        private static final String QUEUE_NAME = "hello_world";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工程
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.16.176.100");
            factory.setUsername("raven");
            factory.setPassword("raven");
    
            // 获取一个连接
            Connection connection = factory.newConnection();
            // 创建一个Channel
            Channel channel = connection.createChannel();
    
            System.out.println("C等待接收消息。。。。。");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String msg = new String(message.getBody());
                System.out.println(msg);
            };
    
            // 取消消息回调
            CancelCallback cancelCallback = (String consumerTag) -> {
                System.out.println("消息消费被中断。。。");
            };
    
            /**
             * 消费消息
             *
             * 消息队列
             * 消费成功之后是否要自动应答
             * 消费成功/失败回调
             */
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    

    3、Consumer1

    /**
     * 接收消息
     */
    public class Consumer1 {
    
        // 队列名称
        private static final String QUEUE_NAME = "hello_world";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建一个连接工程
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("172.16.176.100");
            factory.setUsername("raven");
            factory.setPassword("raven");
    
            // 获取一个连接
            Connection connection = factory.newConnection();
            // 创建一个Channel
            Channel channel = connection.createChannel();
    
            System.out.println("C1等待接收消息。。。。。");
    
            // 接收消息回调
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                String msg = new String(message.getBody());
                System.out.println(msg);
            };
    
            // 取消消息回调
            CancelCallback cancelCallback = (String consumerTag) -> {
                System.out.println("消息消费被中断。。。");
            };
    
            /**
             * 消费消息
             *
             * 消息队列
             * 消费成功之后是否要自动应答
             * 消费成功/失败回调
             */
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
        }
    }
    

    4、发送消息

    11
    发送消息完成:11
    22
    发送消息完成:22
    33
    发送消息完成:33
    44
    发送消息完成:44
    55
    发送消息完成:55
    66
    发送消息完成:66
    

    5、消费消息

    C等待接收消息。。。。。
    11
    33
    55
    
    C1等待接收消息。。。。。
    22
    44
    66
    

    相关文章

      网友评论

        本文标题:05-Work Queues模式

        本文链接:https://www.haomeiwen.com/subject/pjpmultx.html