美文网首页
<三>RabbitMQ单发多收(工作队列)

<三>RabbitMQ单发多收(工作队列)

作者: 者薄 | 来源:发表于2017-04-25 19:03 被阅读0次

    单发多理解---打虎基本功

    单发多收简单理解就是:一个生产者生产,多个消费者消费.生产者将生产的消息放入队列当中,由多个消费者从消息队列中取出消息进行消费.这样能提高系统的吞吐能力

    示例场景---四面八方

    一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息.实现原理图如下:


    图片.png

    项目构建步骤---一步一个脚丫子

    1.实现生产者
    在上篇文章中创建的项目中创建一对多任务的生产者(Provider)

    package com.rabbitmq.task;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    /**
     * 消息队列的任务处理提供者P
     * 
     * @author panyuanyuan
     *
     */
    public class TaskP {
        
        private static final String QUEUE_NAME = "task_queue";
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            for(int i = 0; i < 5; i++) {
                String message = "Hello World! " + i;
                //注意,这里第一个参数为""发送到了匿名交换器上,第二个参数为路由线索====>当时匿名交换器的时候,会发消息发送到和路由线索一样的消息队列上
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
            
            //关闭资源,否则一直会链接到rabbitmq服务器
            channel.close();
            connection.close();
            
        }
    }
    

    2.实现消费者1(Consumer1)

    package com.rabbitmq.task;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    /**
     * 任务队列消费者1
     * 
     * @author panyuanyuan
     *
     */
    public class TaskC1 {
        private static final String QUEUE_NAME = "task_queue";
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
            
            //每次从消息队列中获取数量
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
                
                    String message = new String(body,"UTF-8");
                    System.out.println("Worker1 [x] Received '" + message + "'");
                    try {
                        doWork(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //消息处理完成确认
                        System.out.println("Worker1 [x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            
            //消息消费确认完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
        
        /**
         * 处理任务
         * 
         * @param task
         */
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    

    3.实现消费者2(Consumer2)

    package com.rabbitmq.task;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    /**
     * 任务队列消费者1
     * 
     * @author panyuanyuan
     *
     */
    public class TaskC2 {
        private static final String QUEUE_NAME = "task_queue";
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            
            //第二个参数为true说明这个队列需持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
            
            //每次从消息队列中获取数量---负载均衡,将消息任务均衡的发送到业务服务器
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
                
                    String message = new String(body,"UTF-8");
                    System.out.println("Worker2 [x] Received '" + message + "'");
                    try {
                        doWork(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //消息处理完成确认
                        System.out.println("Worker2 [x] Done");
                        //回复ack非常重要,只有回复了,消息队列才会删除消息从队列中,负责可能造成队列爆满
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            
            //消息消费确认完成
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
        
        /**
         * 处理任务
         * 
         * @param task
         */
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    

    运行测试---万无一失

    这里省略启动RabbitMQ服务,上篇中已经介绍
    1.运行客户端1(消费者Consumer1)
    运行结果如下:

    Worker1 [*] Waiting for messages. To exit press CTRL+C
    

    2.运行客户端2(消费者Consumer2)
    运行结果如下:

    Worker2 [*] Waiting for messages. To exit press CTRL+C
    

    3.运行生产者(生产者Provider)
    运行结果如下:

     [x] Sent 'Hello World! 0'
     [x] Sent 'Hello World! 1'
     [x] Sent 'Hello World! 2'
     [x] Sent 'Hello World! 3'
     [x] Sent 'Hello World! 4'
    

    4.查看消费者1窗口(消费者Consumer1)
    查看结果如下:

    Worker1 [*] Waiting for messages. To exit press CTRL+C
    Worker1 [x] Received 'Hello World! 0'
    Worker1 [x] Done
    Worker1 [x] Received 'Hello World! 2'
    Worker1 [x] Done
    

    5.查看消费者2窗口(消费者Consumer2)
    查看结果如下:

    Worker2 [*] Waiting for messages. To exit press CTRL+C
    Worker2 [x] Received 'Hello World! 1'
    Worker2 [x] Done
    Worker2 [x] Received 'Hello World! 3'
    Worker2 [x] Done
    Worker2 [x] Received 'Hello World! 4'
    Worker2 [x] Done
    

    注意总结---万剑归宗

    当RabbitMQ为一对多任务的时候,生产者生产消息放入队列,队列分消息给消费者消费,这里消费者获取消息是随机的,但是消费者消费的总和等于生产者生产的.

    致谢

    至此RabbitMQ的单发多收(工作)模式已经完成,感谢大大的微笑的博客,参照博客链接 > http://blog.csdn.net/chwshuang/article/details/50521708

    相关文章

      网友评论

          本文标题:<三>RabbitMQ单发多收(工作队列)

          本文链接:https://www.haomeiwen.com/subject/gotbzttx.html