美文网首页
(翻译)RabbitMQ官网教程(2)WorkQueues

(翻译)RabbitMQ官网教程(2)WorkQueues

作者: zhangyaqi | 来源:发表于2020-08-09 17:48 被阅读0次

原文链接: https://www.rabbitmq.com/tutorials/tutorial-two-java.html

在第一章教程中,我们写了面向命名队列发送和接收消息的程序。在本章中我们将创建一个工作队列来向多个worker分发耗时任务。

工作队列(也叫任务队列)的思路就是避免立即执行系统资源占用高的的阻塞任务,而是在以异步的方式来执行。我们可以将工作任务用消息来描述然后将消息发送给队列,然后后台运行的worker接收该消息并最终完成对应的工作任务。如果你启动了多个worker,工作任务将在这些worker中被共享。

工作队列在web应用中相当有用,因为web应用无法短暂的HTTP请求期间处理复杂任务。

准备内容

在之前的教程中,我们向RabbitMQ发送了一条"HelloWorld"的消息,现在我们要发送的是描述工作任务的消息。由于我们没有调整图片大小或者渲染PDF文件之类的耗时任务,所以只能通过调用Thread.sleap()的方式来模拟一个耗时任务。我们以消息中点『.』的数量来表示任务的复杂度,每一个点表示需要执行一秒。比如『Hello...』这个任务需要执行3秒。

为了能够发送命令行的消息,我们需要将之前的Send.java的代码做一些微调。这个程序可以将任务发送到工作队列中,所以命名为NewTask.java.

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

之前的Recv.java的代码也需要做一些调整,接收到的消息内容中每有一个点就假装执行一秒的任务。能处理分发过来的消息并执行相关的任务,所以命名为Worker.java。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

模拟执行时间的任务:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

像第一章的教程那样编译文件:

javac -cp $CP NewTask.java Worker.java

轮询调度

使用任务队列最大的好处就是可以更容易的并行执行任务。当积压了很多工作任务是,只需要引入更多的worker就可以提高处理速度。

首先需要启动两个Worker实例,它们都可以从队列中获取消息,该怎么做呢?

你要先打开三个控制台窗口,其中两个用来运行worker,这就是我们的两个Consumer——C1和C2:

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

在第三个控制台中,我们会发布一些任务(只要启动了Consumer就可以发布任务了)

# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'

默认情况下,RabbiMQ会按照顺序将消息发送给Consumer,所以每个Consumer收的消息数量是一致的。这种消息分发方式被叫做『轮询』

消息确认

任务执行会花一些时间,你可能想知道Consumer在执行一个很耗时的任务时如果中途挂掉会发生什么吧。在我们当前的代码中,RabbiMQ一旦将消息分发给Consumer,就会将消息表示已删除。因此我们就会丢失正在执行中的任务以及刚被分发到该Consumer还没来得及处理的消息。

可是我们并不希望丢失任何任务,如果worker挂掉,我们希望的是RabbiMQ能够将任务分派给其他的worker。

RabbiMQ通过消息确认机制来确保消息不会丢失。确认消息是Consumer发送给RabbiMQ的,用来告诉RabbiMQ消息已经收到并正常完毕,可以删除了。

如果Consumer挂掉(Channel关闭、Connection关闭、或者Socket连接关闭),RabbiMQ就会认为Consumer没有完成该任务,然后将该任务重新添加到工作队列中。如果此时有其他的Consumer在线,RabbitMQ就会将任务重新分派给其它Consumer。通过这种方式即使worker偶尔挂掉也能保证任务不会丢失。

RabbiMQ中的消息不会过期:当Consumer挂掉的时候RabbiMQ会将消息进行重新分派;如果执行任务需要很长时间也是可以的并不会超时。

默认情况下,RabbiMQ需要手动进行消息确认。在上边的例子中,我们通过设置autoAck=true来自动确认消息。现在我们将值设置为false,这样当任务执行完毕时worker就需要手动发送确认消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

这样即使我们在任务执行过程过用Ctrl+C让worker挂掉,也不会丢失任务消息。Worker挂掉后,未经确认的消息很快就会被重新分派。

发送确认消息的Channel必须时和接收消息的Channel保持一致。用不同的Channel来发送确认消息会导致Channel层面的协议异常。详情请看doc guide on confirmations

忘了进行消息确认

很容易犯的一个错误是忘记调用basicAck方法, 这个错误的后果很严重。因为忘记进行消息确认的话,Consumer退出时RabbiMQ就会将消息进行重新分派,这样就造成RabbiMQ收不到任何的确认消息,导致无法释放Queue中的消息,占用的内存就会越来越多。

可以通过rabbitmqctl打印出messages_unacknowledged的消息,以此来debug这种情况。

  sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows系统:

  rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久化

我们已经了解了和确保在Consumer挂掉的情况下不会丢失消息。但是,当RabbiMQ宕机的时候依旧会丢失消息。

如果我们不做特殊处理的话,RabbiMQ在退出或者挂掉的时候会丢失队列以及相应的消息。为了确保消息不会丢失,我们需要做的就是将队列和消息都标记为可持久化的。

首先我们要确保在RabbiMQ重启的时候队列表依旧存在,为此我们需要将队列标记为可持久化的。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这条命令本身没什么问题,但当前情况下它并不会起作用。那是因为我们之前已经定义了名为『hello』的队列,而且不是可持久化的。RabbiMQ不允许用不同的初始参数来重定义队列,真要这么做就会报错。最直接的办法就是重新定义一个新的队列,比如task_queue.

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

在Consumer和Producer中定义队列时的queueDeclare参数都要改变。

我们已经可以保证RabbitMQ在重启的时候不会丢失队列。现在我们可以通过设置MessageProperties.PERSISTENT_TEXT_PLAIN值将消息标记为可持久化的。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

消息持久化需要注意的地方:

将消息标记为持久化并不能保证消息就肯定不会丢失。尽管这样可以告诉RabbitMQ将消息存储到磁盘中,但是RabbitMQ从收到消息到存储到磁盘中还是有一个短暂的时间窗口。而且,RabbitMQ并不是每收到一条消息就会执行fsync(2),它有可能只是将消息存储到了内存而费磁盘。这种持久化的方式可用性虽然不是很高,但对于我们现在这种简单的工作任务而言也足够了。如果你需要更高保障的持久化方案,你可以使用publisher confirms

负载均衡

你可能注意到了,尽管我们已经做了很过工作,但消息的分发还是无法按照我们预想的方式运作。比如有两个Worker的情况,其中一个执行的都是极其极其耗时的任务,而另一个执行的都是轻量级任务,这就导致了一个Worker一直处于满载状态,另一个则大部分时间都处于空闲状态。然而RabbitMQ并不清楚这种情况,还是一如既往的平均分配任务给这两个worker。

这种情况之所以会发生是因为RabbitMQ只要一收到消息就会进行分发,并不会去管Consumer当前到底还有多少任务没完成确认。它只会盲目地将消息一一分发给对应的Consumer而已。

为了避免这种情况,我们可以调用basicQos方法并设置为prefetchCount = 1。这样就会告诉RabbitMQ同一时间只给Worker分发一条消息。换句话说就是,只有在Consumer完成一个任务并进行了消息确认后,再分派新的任务。这样RabbitMQ就会将任务分派给空闲的Worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

这里需要注意队列的大小

如果多有的Worker都处于忙碌状态,队列将会阻塞。这是你必须关注的,你可以增加Worker的数量或者采用其他策略来应对。

最终代码

NewTask.java如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

Worker.java如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

使用消息确认和消息读取数量限制这些持久化措施来初始化队列,可以让你的队列在RabbitMQ在重启的时候也能够保留。

有关Channel、Channel的更多内容,请看 KavaDocs online.

不适用生产环境免责声明

请记住,本教程以及其他的教程都仅仅是教程,每个教程为了阐述一些新概念,讲很多事情都做了简化。比如:连接管理、错误处理、重连、并发、指标收集等内容都做了很大程度的省略。这种简化省略的代码并不能应用于生产环境。

相关文章

网友评论

      本文标题:(翻译)RabbitMQ官网教程(2)WorkQueues

      本文链接:https://www.haomeiwen.com/subject/gptrdktx.html