美文网首页
RabbitMQ (四)工作队列

RabbitMQ (四)工作队列

作者: 薛晨 | 来源:发表于2016-11-02 20:48 被阅读144次

    RabbitMQ官网中文版教程:

    http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html

    上述教程示例为pathon版,Java版及相应解释如下:

    生产者

    package com.xc.rabbitofficial.queue;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    /**
     * 生产者
     *
     * Created by xc.
     */
    public class NewTask {
    
        private final static String QUEUE_NAME = "task_queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            for(int i = 0; i < 5; i++) {
                String message = "I love noodles " + i;
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] sent ' " + message + " '");
            }
    
            channel.close();
            connection.close();
        }
    }
    
    

    消费者1

    package com.xc.rabbitofficial.queue;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * 消费者1
     *
     * Created by xc.
     */
    public class Worker1 {
    
        private final static String QUEUE_NAME = "task_queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL + C");
    
            // 每次从队列中获取数量 (accept only one unack-ed message at a time)
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Worker1 [x] Received '" + message + "'");
    
                    try {
                        doWork(message);
                    } finally {
                        System.out.println("Worker1 [x] Done");
                        // 消息处理完成确认
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
        private static void doWork(String task) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    消费者2

    package com.xc.rabbitofficial.queue;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * 消费者2
     *
     * Created by xc.
     */
    public class Worker2 {
    
        private final static String QUEUE_NAME = "task_queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            factory.setVirtualHost("/");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL + C");
    
            // 每次从队列中获取数量 (accept only one unack-ed message at a time)
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Worker2 [x] Received '" + message + "'");
    
                    try {
                        doWork(message);
                    } finally {
                        System.out.println("Worker2 [x] Done");
                        // 消息处理完成确认
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
        private static void doWork(String task) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    先启动两个消费者,在启动生产者,结果如下:

    1. 生产者


    2. 消费者1


    3. 消费者2


    默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

    API

    1. void basicAck(long deliveryTag, boolean multiple) throws IOException;

    deliveryTag:该消息的标识
    multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

    1. void basicQos(int prefetchCount) throws IOException;

    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

    相关文章

      网友评论

          本文标题:RabbitMQ (四)工作队列

          本文链接:https://www.haomeiwen.com/subject/qlxbuttx.html