场景2:单发送多接收
使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。
准备
我们并没有像调整图片大小、渲染PDF这样真实的任务,所以我们将会发送一些字符串代表复杂的任务,通过使用Thread.sleep()来假装我们的业务系统很繁忙,小数点的数量代表这个任务的复杂性:每一个小数点代表一秒,比如,Hello...代表这个任务需要三秒。
我们稍微修改了一下上一篇的程序,来让他可以发送任意数量的消息,这个程序将会给我们的工作队列安排任务,所以我们给他命名为NewTash.java
我们之前的Recv.java也需要修改一下,他要根据消息体里的小数点数量来增加工作的秒数。他会传递消息并且执行他。改名为Worker.java
循环调度
使用任务队列的好处之一就是可以轻易的平行工作(即部署分布式worker),当面对积压了非常大量的消息的情况时,只需要增加worker的部署数量就可以轻易解决了。
我们尝试同时跑两个worker,即消息的接受者,他们会怎么样共同工作呢?
运行消息发送者,发送五条消息
消息接受者的情况如下:
worker1接收到了1、3、5条消息,worker2接受到了2、4条消息。
默认情况下,rabbitMq会按照顺序的方式给每个消费者发送消息,平均每个消费者都能得到相同数量的消息,这种方式叫做round-robin(循环??)。读者可以自行尝试三个甚至更多的消费者的情况。
消息答复
Worker接收到消息后,完成他的业务代码需要一些时间,你可能想知道在一个消费者接收到一个消息,然后执行业务代码到一部分的时候挂掉了会怎么办。根据我们现在的代码来说,rabbitMq把消息传递给消费者后就会把这些消息删除掉,在这种情况下,如果你干掉了一个worker,我们就会失去这个worker正在执行的,以及所有rabbitMq派发给他并且还没来得及执行的消息。
但是我们并不想失去任何的任务消息,如果一个worker挂掉了,我们想把这个worker头上的任务消息派发到其他的worker头上。
为了确保消息不会丢失,rabbitMq支持消息答复。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack给rabbitMq服务器告诉他我已经执行完成了,你可以把这条消息删除了。
如果一个消费者没有返回消息答复就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitMq就会明白,这个消息还没有被完成,rebbitMq就会重新把这条消息放入队列,如果在这个时间有其他的消费者在线,那么rabbitMq就会迅速的吧这条消息传递给其他的消费者,这样就确保了没有消息会丢失。
就算执行一个消息用了非常长的时间,也不会有任何问题。
手动消息答复默认是开启的,前面的例子我们通过autoAck=ture把他们关闭了。我们现在要把它设置为false,然后从一个worker那里发送一个合适的答复。
这样编码的话,就算你用Ctrl+C杀掉一个正在处理消息的worker进程也不会丢失任何消息,worker挂掉之后,没有答复的消息就会被自动重新传递。
消息持久化
我们已经学到了如何确保就算消费者挂掉消息也不会丢失。但是如果我们的ribbitMq服务器停了的话,我们的消息任务仍然会丢失。
当rabbitMq服务器停止或者崩溃的时候,它就会忘掉所有的队列和消息,除非你告诉它不要这么做。。要确保消息不会丢失我们要做两件事:我们需要使队列和消息持久化。
首先,我们要确保rbbitMq不会丢失我们的队列,我们要做的是声明队列为可持久化的。
尽管命令是正确的,基于我们目前的设置他也不会生效。因为我们已经定义了一个叫hello的非持久化的队列。rabbitMq不会允许你用不同的参数重新定义一个已经存在的队列,如果你这么做,会返回一个错误。我们这有一个快速的变通方案-我们什么一个不同名字的队列。如task_queue:
这个队列声明的变化需要同时应用于生产者和消费者的代码。
这个时候我们就相当确定这个叫task_queue的队列就算rabbitMq重启也不会丢失了。现在我们需要通过设置MessageProperties的值为PERSISTENT_TEXT_PLAIN.把我们的消息标记成可持久化的。
注意:把消息标记成持久化的并不能完全保证消息不会丢失。尽管他告诉了rabbitMq要把这条消息保存到磁盘上,但是仍然有少数情况rabbitMq接收到消息还没来得及保存它。需要更强壮的保证机制publisher confirms.
公平的分配机制
你可能注意到了现在的派发机制并没有像我们想象的那样工作。比如有两个worker的情况下,当所有奇数的消息都很重、偶数消息都很轻的时候,一个worker就会不断地繁忙工作,但是另一个就几乎不工作。但是这些对于rabbitMq服务器来说是不可知的。
这种情况是因为rabbitMq服务器并不去查看每个消费者未答复的消息的数量,它只是盲目的派发消息到消费者。
为了避免这种情况,我们可以使用basicQos方法,设置prefetchCount=1。这就告诉rabbitMq不要把多个消息同时派发给同一个worker。换句话说就是,在一个worker没有完成和答复前一个消息之前,不要给他派发新消息。相应的,它会把新消息派发给一个不忙的worker。
注意队列的大小:
当所有的worker都非常忙碌时,你的队列可能会被填满,所以你要注意你的队列大小,适当的增加更多的worker,或者使用其他的策略。
综合起来,代码如下。
翻译自:http://www.rabbitmq.com/tutorials/tutorial-two-java.html
参考:https://www.cnblogs.com/luxiaoxun/p/3918054.html
网友评论