美文网首页
【译】RabbitMQ教程二

【译】RabbitMQ教程二

作者: maxwellyue | 来源:发表于2017-07-17 23:07 被阅读4138次

    内容来自:RabbitMQ Tutorials Java版


    Work Queues

    在第一个教程中,我们实现了从一个指定的队列中发送和接收消息。在这一部分,我们将会创建一个工作队列:用来讲耗时的任务分发给多个工作者。

    工作队列的主要思想是避免这样的情况:直接去做一件资源密集型的任务,并且还得等它完成。相反,我们将任务安排到之后再去做。我们将任务封装为一个消息,并发到队列中。一个工作进程将会在后台取出任务并最终完成工作。如果开启多个工作进程,任务将会在这多个工作进程间共享。

    这个概念在web应用中是非常有用的,因为web应用不可能在一个HTTP请求中去处理一个复杂的任务。


    准备

    在上一个教程中,我们发送了“hello world”的消息。现在,我们会发送一些代表复杂任务的字符串。我们没有真实的任务(比如调整图片大小、PDF文件加载等),所以我们使用Thread.sleep()方法来伪造耗时任务,假装我们很忙。我们用字符串中的点号.来表示任务的复杂性,一个点就表示需要耗时1秒,比如一个描述为hello...的假任务,它需要耗时3秒。

    将上个教程中的Send.java中的代码稍作修改。因为这个程序会调度任务到工作队列,所以我们将它命名为NewTask.java

    String message = "1.";
    
    channel.basicPublish("", "hello", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    

    之前的Recv.java同样也要做些修改,它需要模拟消息中的点代表的耗时。因为它负责接收消息并处理任务,所以,将它命名为Worker.java

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
    
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
        }
      }
    };
    boolean autoAck = true; // acknowledgment is covered below
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    

    我们的假任务的执行:

    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }   
    

    循环分发

    使用任务队列的一个优势在于容易并行处理。如果积压了大量的工作,我们只需要添加更多的工作者(上文中的Worker.java中的概念),这样很容易扩展。

    首先,我们来尝试同时运行两个工作者实例(Worker.java)。它们都会从队列中获取消息,但具体是如何获取的呢?

    启动NewTask,之后,可以依次将message修改为"2.."、"3..."、"4...."、"5....."等,每修改一次就运行一次。
    观察console中两个工作者的接收消息情况:

    //其中之一的worker
     [x] Received '1.'
     [x] Done
     [x] Received '3...'
     [x] Done
     [x] Received '5....'
     [x] Done
     [x] Received '7....'
     [x] Done
    //另一个worker
     [x] Received '2..'
     [x] Done
     [x] Received '4....'
     [x] Done
     [x] Received '6....'
     [x] Done
     [x] Received '8....'
     [x] Done
    

    可以看出,默认情况下,RabbitMQ是轮流发送消息给下一个消费者,平均每个消费者接收到的消息数量是相等的。这种分发消息的方式叫做循环分发。你可以试一下开3个或更多工作者的情况。


    消息确认

    完成一项任务可能会耗费几秒钟,你可能会问,假如其中一个消费者开始了一个非常耗时的任务,并在执行这个任务的时候崩溃了(也就是没有完成这个任务),将会发生什么事情。按照上面的代码,一旦RabbitMQ向消费者发出消息,消息就会立即从内存中移除。在这种情况下,如果你杀死一个工作者,我们将会失去它正在处理的消息,同时也会丢失所有发给这个工作者但这个工作者还未处理的消息。

    但我们不想丢掉任务,如果一个工作者死掉,我们想将这个任务发给其他的工作者。

    为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者将会发送一个确认信息来告诉RabbitMQ,我已经接收到了消息,并且处理完了,你可以随便删它了。

    如果一个消费者在发送确认信息前死去(连接或通道关闭、TCP连接丢失等),RabbitMQ将会认为该消息没有被完全处理并会重新将消息加入队列。如果此时有其他的消费者,RabbitMQ很快就会重新发送该消息到其他的消费者。通过这种方式,你完全可以保证没有消息丢失,即使某个消费者意外死亡。

    对RabbitMQ而言,没有消息超时这一说。如果消费者死去,RabbitMQ将会重新发送消息。即使处理一个消息需要耗时很久很久也没有关系。

    消息确认机制是默认打开的。只是在前面的代码中,我们显示地关掉了:boolean autoAck=true。将代码做如下修改:

    channel.basicQos(1); // accept only one unack-ed message at a time (see below)
    
    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
    
        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    

    注意到最上面的那句代码:

    channel.basicQos(int prefetchCount);
    

    其中的参数prefetchCount表示:maximum number of messages that the server will deliver

    这样,就可以确保即使消费者挂了,消息也不会丢失。


    消息持久化

    通过上面的教程,我们知道如何确保消费者挂掉也不会丢失消息。但是,加入RabbitMQ服务器挂掉了怎么办?

    如果关闭RabbitMQ服务或者RabbitMQ服务崩溃了,RabbitMQ就会丢掉所有的队列和消息:除非你告诉它不要这样。要确保RabbitMQ服务关闭或崩溃后消息不会丢失,要做两件事情:持久化队列、持久化消息。

    首先,我们要确保RabbitMQ永远不会丢失我们的队列。怎么做呢?在声明队列的时候,指定durable参数为true。

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    

    尽管上面的代码没有错,但是它不会按所想的那样将队列持久化:因为之前我们已经将hello这个队列设置了不持久化,RabbitMQ不允许重新定义已经存在的队列,否则就会报错。但是,我们有一个快速的解决办法:声明另外一个队列就行了,只要不叫hello,比如task_queue

    boolean durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    

    现在,我们已经确保队列不会丢失了,那么如何将消息持久化呢:将MessageProperties的值设置为PERSISTENT_TEXT_PLAIN

    import com.rabbitmq.client.MessageProperties;
    
    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    

    将消息标记为持久化并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘中,但是在RabbitMQ接收到消息和保存消息之间会与一个很短的时间窗。同时,RabbitMQ不会为每个消息做fsync(2)处理,消息可能仅仅保存到缓存中而不会真正地写入到磁盘中。这种持久化保证尽管不够健壮,但已经远远足够我们的简单任务队列。如果你需要更强大的保证,可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)


    公平分发

    你可能已经发现,循环消息分发并不是我们想要的。比如,有两个工作者,当奇数消息(如上文中的"1..."、"3..."、"5..."、"7...")很耗时而偶数消息(如上文中的"2."、"4."、"6."、"8.")很简单的时候,其中一个工作者就会一直很忙而另一个工作者就会闲。然而RabbitMQ对这些一概不知,它只是在轮流平均地发消息。

    这种情况的发生是因为,RabbitMQ 只是当消息进入队列时就分发出去,而没有查看每个工作者未返回确认信息的数量。

    为了改变这种情况,我们可以使用basicQos方法,并将参数prefetchCount设为1。这样做,工作者就会告诉RabbitMQ:不要同时发送多个消息给我,每次只发1个,当我处理完这个消息并给你确认信息后,你再发给我下一个消息。这时候,RabbitMQ就不会轮流平均发送消息了,而是寻找闲着的工作者。

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

    注意,如果所有的工作者都很忙,你的队列可能会装满,你必须留意这种情况:或者添加更多的工作者,或者采取其他策略。

    完整代码:
    NewTask.java

    import java.io.IOException;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.MessageProperties;
    
    public class NewTask {
    
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv)
                          throws java.io.IOException {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
        String message = getMessage(argv);
    
        channel.basicPublish( "", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    
        channel.close();
        connection.close();
      }      
      //...
    }
    

    Worker.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Worker {
      private static final String TASK_QUEUE_NAME = "task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        channel.basicQos(1);
    
        final Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
              doWork(message);
            } finally {
              System.out.println(" [x] Done");
              channel.basicAck(envelope.getDeliveryTag(), false);
            }
          }
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
      }
    
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
          if (ch == '.') {
            try {
              Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
    }
    

    说明

    ①与原文略有出入,如有疑问,请参考原文。
    ②原文是直接用javacp命令运行代码,用IDE更方便。

    相关文章

      网友评论

          本文标题:【译】RabbitMQ教程二

          本文链接:https://www.haomeiwen.com/subject/xnbqkxtx.html