美文网首页
RabbitMQ (四)工作队列

RabbitMQ (四)工作队列

作者: 薛晨 | 来源:发表于2016-11-02 20:48 被阅读144次

RabbitMQ官网中文版教程:

http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html

上述教程示例为pathon版,Java版及相应解释如下:

生产者

package com.xc.rabbitofficial.queue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 生产者
 *
 * Created by xc.
 */
public class NewTask {

    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for(int i = 0; i < 5; i++) {
            String message = "I love noodles " + i;
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] sent ' " + message + " '");
        }

        channel.close();
        connection.close();
    }
}

消费者1

package com.xc.rabbitofficial.queue;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 消费者1
 *
 * Created by xc.
 */
public class Worker1 {

    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL + C");

        // 每次从队列中获取数量 (accept only one unack-ed message at a time)
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker1 [x] Received '" + message + "'");

                try {
                    doWork(message);
                } finally {
                    System.out.println("Worker1 [x] Done");
                    // 消息处理完成确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

消费者2

package com.xc.rabbitofficial.queue;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 消费者2
 *
 * Created by xc.
 */
public class Worker2 {

    private final static String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL + C");

        // 每次从队列中获取数量 (accept only one unack-ed message at a time)
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Worker2 [x] Received '" + message + "'");

                try {
                    doWork(message);
                } finally {
                    System.out.println("Worker2 [x] Done");
                    // 消息处理完成确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

先启动两个消费者,在启动生产者,结果如下:

  1. 生产者


  2. 消费者1


  3. 消费者2


默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

API

  1. void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的标识
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

  1. void basicQos(int prefetchCount) throws IOException;

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

相关文章

网友评论

      本文标题:RabbitMQ (四)工作队列

      本文链接:https://www.haomeiwen.com/subject/qlxbuttx.html