美文网首页JAVA后端架构
消息队列实践(二)——面对多个Consumer时的RabbitM

消息队列实践(二)——面对多个Consumer时的RabbitM

作者: 瑞瑞余之 | 来源:发表于2019-07-23 16:25 被阅读70次

    上一节介绍了针对一个producer、一个broker、一个consumer时,消息的发布、入队、获取的过程。本文将话题延深一步:其它因素不变,当存在多个consumer时,broker会如何处理。本篇的思路同于官网的Work Queue,可作为其中文解读版本。

    本文要实现的效果很简单:

    • 有多个consumer在后台运行
    • 当有一连串的message进入队列,RabbitMQ会自动分配给各个consumer;
      这种情况的应用场景也不少,比如:我们在小米的官网上抢购手机,选择了机型、颜色、套餐,还支付了预付款,当大量的用户,在同一时刻进行这个操作后,小米可能并不会立即处理,而是将这些请求转化成订单,进入队列,由后台的consumers处理。

    面对以上的实现效果,容易让人产生这样的疑虑:

    1. 这种情况有点类似于分布式当中的主从结构,那么RabbitMQ分配任务给consumer,会不会有负载均衡的效果,也就是说它是否会自动计算各个consumer的负载,进行科学的分配?
    2. 另外,一旦一个consumer挂掉,它所负责的队列任务会丢失掉,还是转移到其它的consumer?
    3. 如果RabbitMQ挂掉或者重启,那还未分配的任务会被丢掉吗?

    为了解决上面的疑惑,我们分3步来进行本节的实践:
    首先,建立1Producer + 1broker + 2*Consumer的代码结构,观察系统在默认情况下是如何分配任务,处理Consumer宕机和处理RabbitMQ宕机的;
    其次,通过修改各个Consumer的执行时间,模拟不同节点的处理任务能力,看系统会不会负载均衡;
    最后,对于不满足工程实践的漏洞,我们看看官方推荐的解决方案。

    代码实践

    1. 创建队列任务
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class NewTask {
    
      private static final String TASK_QUEUE_NAME = "a_task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    
            String message = String.join(" ", argv); //将数组转化为一个由空格分隔的字符串
    
            channel.basicPublish("", TASK_QUEUE_NAME,
                    null,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
      }
    
    }
    
    1. 创建执行任务的Consumer * 2
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Worker {
    
      private static final String TASK_QUEUE_NAME = "a_task_queue";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
    
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
    
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
      }
    
    //任务字符串中,每出现一个“.”,延迟一秒,用来模拟consumer处理需要的耗时
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
      }
    }
    
    1. 打开三个terminal,其中两个分别启动Work.java,作为两个consumer;之后再执行NewTask.java
      之前需要通过以下方式安装依赖,在NewTask.java和Worker.java的同级目录执行:
    wget https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.6.0/amqp-client-5.6.0.jar
    wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar
    wget https://repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.25/slf4j-simple-1.7.25.jar
    

    配置环境变量:

    export CP=.:amqp-client-5.6.0.jar:slf4j-api-1.7.25.jar:slf4j-simple-1.7.25.jar
    

    编译NewTask.java和Worker.java生成字节码文件:

    //再次提醒:要保证依赖和源文件在同一级目录
    javac -cp $CP NewTask.java Worker.java
    
    文件都在同一级目录
    此时我们让Worker跑起来,它们会等待任务发布,在terminal1和terminal2中分别执行:java -cp $CP Worker,在terminal3中执行java -cp $CP NewTask Message1 一直到 java -cp $CP NewTask Message5
    NewTask&Worker
    可以发现,RabbitMQ会交替的将任务给到下辖的Worker进行处理,我们可以看以下此时RabbitMQ的队列情况
    a_task_queue

    工程实践

    可以看到一旦Worker接收,队列中的message会被删除。这里可能存在一个隐患,如果NewTask在发送过程中,将其中一个Worker干掉,看这条message是否会丢失,此时我们重新执行Worker,且修改输入参数为: java -cp $CP NewTask Message5......,我们注意到在private方法doWork中,每一个“.”代表处理延迟一秒,这用来模拟系统在处理message的时候花费的时间

    //任务字符串中,每出现一个“.”,延迟一秒,用来模拟consumer处理需要的耗时
      private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
      }
    

    我们在NewTask打印了“[x] Sent 'Message num.........'”日志之后,关闭正在处理的Worker终端,如下图所示:

    干掉一个正在执行的Worker
    问题出现了!RabbitMQ中的queue因为Worker1已经接收到了Message 5......所以把对应的message删除了,但是Worker1在处理这个message还没有完成的时候(未打印done,可以看一下上文的代码细节)被干掉了。此时Message 5......即没有重新出现在queue中也没有被转派给Worker2,造成了数据丢失!这就是我们在文初提到的第二个问题:一旦一个consumer挂掉,它所负责的队列任务会丢失掉,还是转移到其它的consumer **
    目前看来答案是“丢失掉”,RabbitMQ对此进行了处理
    (划重点啦!!!)**:
    RabbitMQ提供了一种机制叫做—— message acknowledgments,我们不妨称它为消息认证,也就是说,当consumer收到了message,并且彻底处理完之后,会回传给broker一个ack标志,告诉broker可以从queue中删除对应的message了,如果broker发现有当和某个consumer断了之后,还没有收到message的ack,它就知道需要重新发送给其它的consumer(如果有的话)。这个功能实现起来并不复杂,修改Worker.java的对应代码就可以:
    //关闭自动认证,改为手动认证(其默认为自动认证,也就是一接收到message就认证成功)
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      String message = new String(delivery.getBody(), "UTF-8");
    
      System.out.println(" [x] Received '" + message + "'");
      try {
        doWork(message);
      } finally {
        System.out.println(" [x] Done");
    //最后,在打印完成之后,发出ack
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
    };
    

    在给出以上代码运行结果之前,我想展示一下,我在coding中遇到的一个小问题,大家要注意:我将autoAck = false,而忘记在日志后面手动发送认证,也就是做了上面第一处代码修改,而忘记修改第二处。这里导致了一个问题,因为系统不会自动返回ack,而我又没有手动添加,导致队列中unacknowledged的message越来越多,进入docker执行rabbitmqctl list_queues name messages_ready messages_unacknowleged看到即便已经处理过的message也没有被清除。如果这里不注意,会带来比较严重的后果,一是unack的message在与当前consumer断开连接后,它会重新发给其它consumer,对于我们的系统而言,它们其实已经被处理过,可能早上数据混乱;另一方面,不断增加的unack message会导致RabbitMQ吃掉更多内存。

    未确认的message数
    到目前为止,我们对broker(RabbitMQ)正常,Consumer异常而导致的数据丢失已经做了处理。那反过来思考一下:broker挂掉了那些还未分派的队列任务还会存在吗?
    读者可以模拟上文我coding时犯的错误,使得rabbitmq中存在unack message,然后再配合rabbitmqctl stop_apprabbitmqctl start_app 看之前unack message是否还在。这里我们略过过程直接给出结果:rabbitmq中数据丢失!
    rabbitmq数据丢失
    问题来了,我们如何解决,最直接的一个想法,就是queue message持久化嘛!这里分为两个方面:
    最基本的:rabbitmq中的队列不能丢失,比如上面演示中遇到的“队列不见了”,肯定要避免;
    另外:queue中的message不能丢失;
    对于这两个方面,Rabbitmq都有方式解决,首先,我们在声明队列的时候,需要将队列的duration设置为true,表示即便Rabbitmq挂掉后重启,该队列仍然存在!
    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    

    此外,我们在发布channel(NewTask.java)的时候需要设置message的持久化:

    import com.rabbitmq.client.MessageProperties;
    
    channel.basicPublish("", "a_task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    

    以上可以基本实现rabbitmq中数据不丢失,在这里回应开篇的第三个问题“如果RabbitMQ挂掉或者重启,那还未分配的任务会被丢掉吗?”
    answer:会被丢掉,但是可通过配置队列和持久化message解决!

    终于到了终极一问:“RabbitMQ分配任务给consumer,会不会有负载均衡的效果,也就是说它是否会自动计算各个consumer的负载,进行科学的分配?”
    我们做这样一个试验,Worker1执行一个耗时较长的任务,比如

    //NewTask.java to Worker1
    java -cp $CP NewTask Message 9...............................................................................
    

    Worker2执行耗时很短的任务:

    //NewTask.java to Worker2
    java -cp $CP NewTask Message 10..
    

    在Worker1还未完成任务之前,我们立即发布两次新任务

    //NewTask.java to some worker
    java -cp $CP NewTask Message 11..
    java -cp $CP NewTask Message 12..
    
    fair dispatch

    上图的结果,说明了问题:RabbitMQ默认会将队列任务均分到各个consumer,即便某一个consumer耗时较长,轮到它的任务不会负载均衡到其它节点,而是等待它执行完成,可以说RabbitMQ在分发任务时处于blindly dispatch(盲发)的状态。为了解决这个问题我们可以设置consumer与broker的chanel属性basicQos。比如

    channel.basicQos(1);
    

    意味着如果这个consumer存在一个或多个未完成的任务,包括正在执行的和unack的任务,则rabbitmq会去找下一个consumer,直到找到完全空置的consumer,才将任务分配给它。
    到此为止,我们回答了开篇提出的所有问题,读者可以从github上找到对应的完整源码(NewTask.java Worker.java),不妨动手试一试!

    相关文章

      网友评论

        本文标题:消息队列实践(二)——面对多个Consumer时的RabbitM

        本文链接:https://www.haomeiwen.com/subject/lsnwlctx.html