RabbitMQ官网中文版教程:
http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html
上述教程示例为pathon版,Java版及相应解释如下:
生产者
package com.xc.rabbitofficial.queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者
*
* Created by xc.
*/
public class NewTask {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 5; i++) {
String message = "I love noodles " + i;
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] sent ' " + message + " '");
}
channel.close();
connection.close();
}
}
消费者1
package com.xc.rabbitofficial.queue;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者1
*
* Created by xc.
*/
public class Worker1 {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL + C");
// 每次从队列中获取数量 (accept only one unack-ed message at a time)
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker1 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker1 [x] Done");
// 消息处理完成确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
消费者2
package com.xc.rabbitofficial.queue;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者2
*
* Created by xc.
*/
public class Worker2 {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL + C");
// 每次从队列中获取数量 (accept only one unack-ed message at a time)
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Worker2 [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println("Worker2 [x] Done");
// 消息处理完成确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
先启动两个消费者,在启动生产者,结果如下:
-
生产者
-
消费者1
-
消费者2
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
API
- void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的标识
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
- void basicQos(int prefetchCount) throws IOException;
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
网友评论