美文网首页
2020-09-05 python分布式多进程

2020-09-05 python分布式多进程

作者: 昨天今天下雨天1 | 来源:发表于2020-09-05 14:53 被阅读0次

下面是一个简单的Master/Worker模型,实现一个简单的分布计算。如果要启动多个worker,就可以把任务分配到多台机器上了,

比如把计算n*n的代码替换成发送邮件,就实现了邮件队列的异步发送。

通过manager模块的支持,多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中。

注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。

比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

servers_managers.py

#!usr/bin/env python
# -*- coding: utf-8 -*-

import random, time, Queue, datetime
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support


# 建立2个队列,一个发送,一个接收
task_queue = Queue.Queue()
result_queue = Queue.Queue()


def get_task():
    return task_queue


def get_result():
    return result_queue


class QueueManager(BaseManager):
    pass


# 服务器的管理器上注册2个共享队列
QueueManager.register('get_task', callable=get_task)
QueueManager.register('get_result', callable=get_result)
# 设置端口,地址默认为空。验证码authkey需要设定。
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')


def manager_run():
    manager.start()
    # 通过管理器访问共享队列。
    task = manager.get_task()
    result = manager.get_result()

    # 对队列进行操作, 往task队列放进任务。
    for _ in range(10):
        n = random.randint(0, 100)
        print 'Put task {}'.format(n)
        task.put(n)

    # 从result队列取出结果
    print 'Try get result...'
    try:
        for _ in range(10):
            # 超时时间
            r = result.get(timeout=10)
            print 'Result: {}'.format(r)
    except Queue.Empty:
        print 'result is empty'

    # 关闭管理器。
    manager.shutdown()
    print 'master exit.{}'.format(datetime.datetime.now())


if __name__ == '__main__':
    freeze_support()
    manager_run()

client_managers.py

#!usr/bin/env python
# -*- coding: utf-8 -*-

import sys, time, Queue, datetime, os
from multiprocessing.managers import BaseManager
from multiprocessing import Pool


class QueueManager(BaseManager):
    pass


# 从网络上的服务器上获取Queue,所以注册时只提供服务器上管理器注册的队列的名字:
QueueManager.register('get_task')
QueueManager.register('get_result')

server_addr = '10.2.81.177'
print 'Connect to server {}...'.format(server_addr)

# b'abc'相当于'abc'.encode('ascii'),类型是bytes
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')

# 连接服务器
m.connect()

# 获得服务器上的队列对象
task = m.get_task()
result = m.get_result()


def work():
    # 从task队列取数据,并添加到result队列中
    while True:
        try:
            if task.empty():
                time.sleep(1)
                break
            else:
                n = task.get(timeout=1)
                print 'run task %d * %d...' % (n, n)
                r = '%d * %d = %d' % (n, n, n * n)
                time.sleep(1)
                result.put(r)
        except EOFError:
            print 'EOFError'


# 多进程运行创建随机文件并加写锁
pool = Pool(processes=3)
for i in range(3):
    # func给的方法对象,args给方法参数
    pool.apply_async(func=work)
pool.close()
pool.join()

print 'worker exit.{}'.format(datetime.datetime.now())
os._exit(1)

相关文章

网友评论

      本文标题:2020-09-05 python分布式多进程

      本文链接:https://www.haomeiwen.com/subject/ykcfektx.html