工作队列
ps: 使用pika python客户端
前面写了一个生产者将消息发送到队列中,消费者从队列中取出消息的程序。现在我将创建一个工作队列(Work Queue),这个队列会发送一些耗时的任务给工作者(Working)。
工作队列又叫任务队列(Task Queue)是为了避免等待一些占用大量资源、时间的操作。当我们把任务当作消息发送到队列中,一个运行在后台的工作者取出任务后处理,当我们运行很多工作者,然乌就会在他们之间共享,就是队列会平均将任务分发给工作者。
准备
这次我们模拟比较耗时的任务发送到队列中,使用sleep.time()函数赖模拟耗时任务。在消息中假如点号(.)一个点号标识耗时一秒,hello...就会耗时三秒。
new_task.py
# coding: utf-8
import pika
import sys
message = ' '.join(sys.argv[1:]) or 'Hello world!'
channel.basic_publicsh(exchange='',
routing_key='hello',
body=message)
print ' [x] Sent %r' % (message, )
将我们之前写的receiver.py文件中的代码调整一下,命名为worker.py。
# coding: utf-8
import time
def callback(chm method, properties, body):
print ' [x] Received %r' % (body, )
time.sleep(body.count('.'))
print ' [x] Done'
循环调度
使用队列的好处就是队列能够并行处理任务,如果积累了很多任务,只需要增加工作者就可以。
我们现在运行脚本,打开两个终端运行worker.py文件,标识两个Worker。
- 终端1
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
- 终端2
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
再运行一个终端来发布任务。
- 终端3
$ python new_task.py 1 message.
$ python new_task.py 2 message..
$ python new_task.py 3 message...
$ python new_task.py 4 message....
$ python new_task.py 5 message.....
我们来看工作者接收到什么:
worker1:
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received '1 message.'
[x] Received '3 message...'
[x] Received '5 message.....'
worker2:
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received '2 message.'
[x] Received '4 message...'
从运行结果可以看出,这两个工作者(Worker)接收到消息是又队列按照顺序分发的(worker1先于2运行),这种发送消息的方式叫做--轮询。
待续。。。
参考文章:http://wiki.jikexueyuan.com/project/rabbitmq/hello-world.html
网友评论