注:这是RabbitMQ-java版Client的指导教程翻译系列文章,欢迎大家批评指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介绍队列的使用
第三篇Publish/Subscribe介绍转换器以及其中fanout类型
第四篇Routing介绍direct类型转换器
第五篇Topics介绍topic类型转换器
第六篇RPC介绍远程调用
Work Queues
java-two.png在第一篇指导教程中,我们写到应用去发送消息到队列中和从队列中取出消息。在这篇教程中将会创建一个工作队列,用于在多个工作者间按效率分配任务。
工作队列的主要思想是避免马上去做一件消耗资源的任务,而可以等着它完成。换句话说分配任务可以稍后去完成。我们把一个任务当作一个消息,然后发送到一个队列中。一个后台执行的工作进程就可以取出任务,并且最终执行这项任务。当你运行多个工作者的时候,许多工作就可以在这些工作者之间共享完成。
这个概念在web应用中尤其有用,web应用一般是在简短的http请求中去处理复杂的任务。
准备工作(Preparation)
在上篇指导教程中我们发送一条包含字符串Hello World的信息,现在我们将发送代表复杂任务的字符串。我们并没有真正的工作任务,像图片的压缩,或者渲染pdf文件。因此我们通过假装很忙去实现它-使用线程睡眠的功能。我们将字符串中每个小点都当作一个复杂的任务,每一个小点都需要一秒的工作时间。例如,一个伪装的项目为“Hello...”需要三秒来完成。
我们将对以前例子中的Send.java类中稍作修改,允许通过命令行发送任意的消息。这个程序将会分配任务到我们的工作队列中,因此命名为NewTask.java:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
从命令行的参数中获取到消息:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, ".");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
以前例子中的Recv.java也需要做一些修改:它需要在消息中对每一个小点做一个一秒的消耗,将处理被分发到的的消息并最终完成任务。因此命名为Work.java:
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below消息的应答机制
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); //接受消息的监听
捏造的任务需要计算时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
编译它们就像在上一篇指导教程中一样
javac -cp $CP NewTask.java Worker.java
循环分发机制(Round-robin dispatching)
创建一个队列的优势之一就是可以方面的同时做各个任务,如果我们需要处理大量挤压的工作,只需要增加更多工作者就可以解决。
第一步同时运行两个工作者应用,它们将都会从队列中获取到消息。但是如何获取呢,我们拭目以待。
需要把三个控制台打开,两个运行工作者应用,也就是我们的两个消费者:C1和C2。
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
第三个用来发布新的任务,只要先创建好了消费者,你就发布一些消息:
# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....
我们来看看是如何分发给工作者的:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
以上可见,RabbitMQ默认的方式是有序的发送消息给每一个消费者,一般每个消费者都会持有相同数目的消息。这种分发消息的方式叫做循环分发机制(round-robin)。可以尝试三个或者更多的工作者。
消息应答机制(Message acknowledgment)
完成一项任务需要花费一些时间。你可能会想:当一个消费者处理一个比较耗时的任务时,只完成一部分就死掉了会发生什么事情?按照我们目前的代码来看,RabbitMQ分发消息给消费者后就立刻从内存中移除该消息。这种情境中,如果你杀死这个消息者,被消费者正处理的消息就会被丢失。由此可见,这样分发的所有消息还没有被消费者处理就全部丢失了。
但是我们并不像丢失任何的任务,如果一个工作者死了,我们想把这个任务转发给其他的工作者。
为了确保这个消息没有被丢失,RabbitMQ支持消息应答机制。消息应答机制是当消费者接受到消息后可以返回一条信息告诉RabbitMQ这条消息已经被接受并处理了,然后RabbitMQ就可以删除该消息。
如果一个消费者死了(通道关闭,连接关闭,或者TCP连接失败),没有发送一个应答反馈,RabbitMQ将会理解为这个消息还没有被处理,那么消息还将存在消息队列中。如果同时还有其它的消费者存活着,这个消息将会被重新发送给其它的消费者。这种方式就可以确保没有消息丢失,即使消费者意外死了。
没有处理消息超时的情况,除非这个消费者死了RabbitMQ才会重新分发消息。如果处理一个消息要花费很长很长的时间,RabbitMQ也会等待。
消息应答机制默认是开启的,但是在上一篇指导教程中是通过设置autoAck=true标识将他们关闭的。现在将这个标识设置为false,并且工作者在完成任务后,发送一条合适的应答反馈。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
我们相信:使用这种方法,即使通过CTRL+C杀死一个正在处理消息的工作者,也没有消息会丢失。在这个工作者死了之后,没有应答反馈的消息将会被重新分发。
忘记了消息应答反馈
这是一个常见的错误:没有调用basicAck方法。也是很容易犯的错误,但是结果很严重。当消费者放弃时,消息将会被重新分发(看起来会很少分发)。但是RabbitMQ将会消耗大量的内存,因为它不能够释放任何未应答反馈的消息。
为了调试这类型的错误,可以使用rabbitmqctl去打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在window上,不需要sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息的持久性(Message durability)
我们已经知道如何确保在消费者死了的情况下,这个任务还不会丢失。但是在RabbitMQ服务端停止的时候,这个任务还是会丢失。
当RabbitMQ服务端停止或者崩溃的时候,它的所有队列和消息都会丢失,除非你告诉它两件事才能确保消息不会被丢失:标记队列和消息持久化。
第一步,我们确保RabbitMQ不会丢失队列,为了达到这个目的,需要将它声明持久化:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
尽管这行代码本身是正确的,但是对于已经存在的队列再次创建是不会有效的,因为我们已经定义了一个不是持久化队列的名字叫Hello。RabbitMQ不会容许你重新定义一个带有不同参数的已经存在的队列,任何应用这么做的话,它将会返回一个错误。但是这里有变通方案:声明一个不一样的队列名字。例如:task_queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
这个变化的方法需要同时应用在生产者和消费者的代码中。
现在我们可以确定,即使RabbitMQ重新启动,task_queue队列都不会丢失。接着我们需要标记消息持久化,通过设置MessageProperties(它实现了BasicProperties接口)的值为PERSITENT_TEXT_PLAIN.
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意:消息持久化
标记消息持久化并不能完全保证消息一定不会丢失。虽然已经告诉RabbitMQ需要持久化消息到硬盘上,但还有一小段时间,当RabbitMQ已经接受到消息但还没有保存的时候,或者RabbitMQ没有为每个消息异步操作时,就有可能值保存在缓存中而没有写入到磁盘上。这个持久化的保证并不强大,但是已经足够我们简单任务队列的使用。如果你需要更强大的保证,你可以使用发布确认机制。
高效分发(Fair dispatch)
你可能已经注意到这样的分发方式仍不是我们想的那样。举例来说,一个场景中有两个工作者,当有些消息是复杂的,有些消息是简单的,一个工作者可能会一直很忙,而另外一个工作者几乎不用做什么工作。RabbitMQ并不知道它们的工作情况,并且仍会循环的分发消息。
当消息进入队列中,RabbitMQ仅仅只是分发消息,它并不会查看消费者的反馈信息,只是盲目的分发第N个消息给第N个人。
prefetch-count.png
为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount属性值为1。这个表明在一个时间点RabbitMQ不要分发更多的消息给工作者,或者换句话说,直到上个消息被处理并且得到应答反馈,才能分发一条新的消息给这个工作者。或者RabbitMQ会分发给一个工作不忙的工作者这个新消息。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意队列的大小
如果所有的工作者都很忙,队列可能溢出。你需要注意这方面问题,如果存在这个问题可以添加更多的工作者,或者使用其他的一些策略方式。
综合
最终NewTask.java类的代码,这里下载:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
以及Work.java代码,这里下载:
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}}}}}
在创建工作队列时,使用消息应答机制和basicBos的参数值设置;即使RabbitMQ服务端重启,通过设置持久化的一些选项就可以保存任务。
想了解更多相关Channel的信息和MessageProperties属性,可以点击这里查看。
第二节的内容大致翻译完了,这里是原文链接。接着进入下一节:Publish/Subscribe。
终篇是我对RabbitMQ使用理解的总结文章,欢迎讨教。
--谢谢--
网友评论