下面是一个简单的Master/Worker模型,实现一个简单的分布计算。如果要启动多个worker,就可以把任务分配到多台机器上了,
比如把计算n*n的代码替换成发送邮件,就实现了邮件队列的异步发送。
通过manager模块的支持,多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中。
注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。
比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。
servers_managers.py
#!usr/bin/env python
# -*- coding: utf-8 -*-
import random, time, Queue, datetime
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 建立2个队列,一个发送,一个接收
task_queue = Queue.Queue()
result_queue = Queue.Queue()
def get_task():
return task_queue
def get_result():
return result_queue
class QueueManager(BaseManager):
pass
# 服务器的管理器上注册2个共享队列
QueueManager.register('get_task', callable=get_task)
QueueManager.register('get_result', callable=get_result)
# 设置端口,地址默认为空。验证码authkey需要设定。
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
def manager_run():
manager.start()
# 通过管理器访问共享队列。
task = manager.get_task()
result = manager.get_result()
# 对队列进行操作, 往task队列放进任务。
for _ in range(10):
n = random.randint(0, 100)
print 'Put task {}'.format(n)
task.put(n)
# 从result队列取出结果
print 'Try get result...'
try:
for _ in range(10):
# 超时时间
r = result.get(timeout=10)
print 'Result: {}'.format(r)
except Queue.Empty:
print 'result is empty'
# 关闭管理器。
manager.shutdown()
print 'master exit.{}'.format(datetime.datetime.now())
if __name__ == '__main__':
freeze_support()
manager_run()
client_managers.py
#!usr/bin/env python
# -*- coding: utf-8 -*-
import sys, time, Queue, datetime, os
from multiprocessing.managers import BaseManager
from multiprocessing import Pool
class QueueManager(BaseManager):
pass
# 从网络上的服务器上获取Queue,所以注册时只提供服务器上管理器注册的队列的名字:
QueueManager.register('get_task')
QueueManager.register('get_result')
server_addr = '10.2.81.177'
print 'Connect to server {}...'.format(server_addr)
# b'abc'相当于'abc'.encode('ascii'),类型是bytes
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 连接服务器
m.connect()
# 获得服务器上的队列对象
task = m.get_task()
result = m.get_result()
def work():
# 从task队列取数据,并添加到result队列中
while True:
try:
if task.empty():
time.sleep(1)
break
else:
n = task.get(timeout=1)
print 'run task %d * %d...' % (n, n)
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
except EOFError:
print 'EOFError'
# 多进程运行创建随机文件并加写锁
pool = Pool(processes=3)
for i in range(3):
# func给的方法对象,args给方法参数
pool.apply_async(func=work)
pool.close()
pool.join()
print 'worker exit.{}'.format(datetime.datetime.now())
os._exit(1)
网友评论