接着以前的笔记
消息确认
当处理一个比较耗时的任务的时候,我们不知道消费者是否会运行到一半的时候就挂掉。在以前的代码中,当消息被RabbitMQ发送给消费者之后,马上就会在内存中移除,这种情况下,如果把一个工作者终止,正在处理的消息就会丢失。同时,所有发送给这个工作者的还没有处理的消息也会丢失。
我们想不想丢失任何任务消息,该怎么办?为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到饼处理了这条小心,然后RabbitMQ就会是释放这条消息。
如果消费者挂掉,没有响应,RabbitMQ会认为这条消息没有完全处理会重新发送到队列中。
超时
在RabbitMQ中没有超时这个概念,当工作者断开连接时候,RabbitMQ就会重新发送消息。
消息响应默认是开启的,但是我们之前的参数设置的n0_ack=True将消息确认关闭了。现在只要删除这个参数使用RabbitMQ提供的默认就可以。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello')
特别注意:
一个很容易犯的错误就是忘记了basic_ack,后果很严重,如果worker在取出任务并完成了,但是没有设置basic_ack,RabbitMQ接收不到确认,就不能释放这个消息,就会占用很多内存,在未确认小夏积累过多。
排除这种错误可以使用下面命令或者直接在图形界面查看没有确认消息。
(winsows 在RabbitMQ安装目录下的sbin中打开cmd)
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
消息持久化
在RabbitMQ退出或者崩溃的时候,再次启动会丢失队列和消息,在没有设置消息持久化的情况下。为了确保信息不丢失,我们需要吧"队列"和 "消息"设置为持久化。
首先为了不让队列消失,将队列设置持久化。
channel.queue_declare(queue='task_queue', durable=True)
从代码中可以看出,queu参数后面的值变了,因为已经有一个队列名字为hello,是一个非持久化队列,所以我们不能在创建一个持久化队列hello。
现在我们的队列已经做到了持久化,但是消息还没有持久化设置--将deliveery_model属性设置为2
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
公平调度
从工作队列运行结果,我们可以看出,两个工作者处理奇数消息的比较繁忙,处理偶数消息的比较轻松。但是RabbitMQ不知道,他只是负责派发。
我们可是使用basic_qos方法并设置为prefeth_count=1。这样就让RabbitMQ在同一时刻不要发送超过一条的消息给一个Worker。知道worker处理完上一条消息并作出响应。
channel.basic_qos(prefetch_count=1)
解释
没有设置prefeth_count=1
如果没有设置prefeth_count=1,我们创建两个worker(worker1、worker2)和一个application。应用连续发送6个耗时(每一个任务耗时10s)任务到队列。然后再创建一个worker3,可以发现worker3没有分配到一个任务,所有的任务都被worker1和worker2完成。
这是因为RabbitMQ在同一时刻发送超过一条的消息给了工作者,所以在创建了worker3后,它没有消息可以接收。
设置prefeth_count=1
设置后,同样做上述的测试,我们可以发现,哪一个worker任务完成,RabbitMQ会为它再发送消息,不会出现上面的worker3接收不到消息。
整合代码
new_task.py
# coding:utf-8
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or 'Hello World!'
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
print ' [x] Sent %r' % (message,)
channel.close()
worker.py
# coding:utf-8
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
# 参数durable设置为True表明队列是持久化
# 如果只是在这个文件声明队列是持久化的,但是在另一个创建队列的地方没有声明持久化,就会报错
# 因为在创建队列的时候,创建的队列的名字是一样的,但是属性不一样,所以会报错
# 这样做只是实现了队列的持久化还需要将消息设置为持久化
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print ' [x] Received %r' % (body,)
time.sleep(body.count('.'))
print ' [x] Done'
# 为什么加上这句话?
# 在没有加上这句话之前,发布者发送了四个任务到队列中,虽然都被消费者取走,但是图形化界面
# 上显示Unacked还是4,表示四个消息没有得到消费者的确认,虽然消费者取走了消息,但是没有发送确认
# 给RabbitMQ,所以四个消息会被重新发送到队列中,这样下去会占用内存。
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
待续。。。
参考文章:http://rabbitmq.mr-ping.com/tutorials_with_python/[2]Work_Queues.html
网友评论