美文网首页
python多任务--进程池Pool

python多任务--进程池Pool

作者: 小啊小狼 | 来源:发表于2020-10-19 09:53 被阅读0次

进程池Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

一:使用进程池

例1:非阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
    po = Pool(5)  # 定义一个进程池,最大进程数5

    # 往进程池中添加任务
    for i in range(10):
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply_async(worker, (f'liang{i}',))

    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print("----all_done----")

if __name__ == '__main__':
    main()

执行结果

----start----
liang0开始执行,进程号为10404
liang1开始执行,进程号为9920
liang2开始执行,进程号为13136
liang3开始执行,进程号为10180
liang4开始执行,进程号为7708
liang4 执行完毕,耗时0.57
liang5开始执行,进程号为7708
liang2 执行完毕,耗时1.20
liang6开始执行,进程号为13136
liang1 执行完毕,耗时1.33
liang7开始执行,进程号为9920
liang0 执行完毕,耗时1.34
liang8开始执行,进程号为10404
liang3 执行完毕,耗时1.96
liang9开始执行,进程号为10180
liang5 执行完毕,耗时1.73
liang9 执行完毕,耗时0.54
liang8 执行完毕,耗时1.28
liang7 执行完毕,耗时1.37
liang6 执行完毕,耗时1.88
----all_done----

函数解释:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表, kwds为传递给func的关键字参数列表;

  • apply(func[, args[, kwds]]):使用阻塞方式调用func

  • close():关闭Pool,使其不再接受新的任务;

  • terminate():不管任务是否完成,立即终止;

  • join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用;

执行说明:创建一个进程池pool,并设定进程的数量为5,range(10)会相继产生10个对象,10个对象被提交到pool中,因pool指定进程数为5,所以0、1、2、3、4会直接送到进程中执行,当其中一个执行完后才空出一个进程处理对象,继续去执行新的对象,所以会出现输出“liang5开始执行,进程号为7708”出现在"liang4 执行完毕,耗时0.57"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“----start----”,主程序在pool.join()处等待各个进程的结束。

例2:阻塞

from multiprocessing import Pool
import os, time, random


def worker(name):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)
    t_stop = time.time()
    print(name, "执行完毕,耗时%0.2f" % (t_stop - t_start))

def main():
    po = Pool(3)  # 定义一个进程池,最大进程数3

    # 往进程池中添加任务
    for i in range(0, 5):
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        po.apply(worker, (f'liang{i}',))

    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    print("----all_done----")

if __name__ == '__main__':
    main()

输出

liang0开始执行,进程号为1976
liang0 执行完毕,耗时1.75
liang1开始执行,进程号为12624
liang1 执行完毕,耗时0.57
liang2开始执行,进程号为12444
liang2 执行完毕,耗时0.52
liang3开始执行,进程号为1976
liang3 执行完毕,耗时1.23
liang4开始执行,进程号为12624
liang4 执行完毕,耗时0.85
----start----
----all_done----

因为是阻塞,主函数会等待进程的执行,执行完之后才会继续往下,所以运行完所有进程后才输出“----start----”

例3、使用进程池,并返回结果

from multiprocessing import Pool
import os, time, random


def worker(name):
    print("%s开始执行,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)
    return name,os.getpid()

def main():
    po = Pool(3)  # 定义一个进程池,最大进程数3

    res=[]
    # 往进程池中添加任务
    for i in range(0, 5):
        # Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))
        # 每次循环将会用空闲出来的子进程去调用目标
        res.append(po.apply_async(worker, (f'liang{i}',)))

    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后
    for result in res:
        print(result.get())  #get()函数得出每个返回结果的值
    print("----all_done----")

if __name__ == '__main__':
    main()

输出结果:

----start----
liang0开始执行,进程号为14012
liang1开始执行,进程号为13000
liang2开始执行,进程号为14120
liang3开始执行,进程号为14012
liang4开始执行,进程号为14012
('liang0', 14012)
('liang1', 13000)
('liang2', 14120)
('liang3', 14012)
('liang4', 14012)
----all_done----

例4、多进程执行多个任务

from multiprocessing import Pool
import os, time, random


def worker1(name):
    print("%s开始执行work1,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)

def worker2(name):
    print("%s开始执行work2,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)

def worker3(name):
    print("%s开始执行work3,进程号为%d" % (name, os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random() * 2)

def main():
    po = Pool(4)  # 定义一个进程池,最大进程数3

    work_list=[worker1,worker2,worker3]
    # 往进程池中添加任务
    for work in work_list:
        for i in range(3):
            po.apply_async(work, (f'liang{i}',))

    print("----start----")
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,必须放在close语句之后

    print("----all_done----")

if __name__ == '__main__':
    main()

线程池4个线程执行3个任务,每个任务执行3次。
输出:

----start----
liang0开始执行work1,进程号为13088
liang1开始执行work1,进程号为4908
liang2开始执行work1,进程号为4200
liang0开始执行work2,进程号为8124
liang1开始执行work2,进程号为4908
liang2开始执行work2,进程号为13088
liang0开始执行work3,进程号为8124
liang1开始执行work3,进程号为4200
liang2开始执行work3,进程号为4908
----all_done----

二、进程池进程之间的通讯

进程池中进程的通讯队列
from multiprocessing import Pool, Manager
q = Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager


def work(name, q):
    time.sleep(1)
    print(f"{name}:---{os.getpid()}---{q.get()}")

def main():
    # 创建一个用于进程池通信的队列
    q = Manager().Queue()

    for i in range(1000):
        q.put(f'data-{i}')

    # 创建一个拥有五个进程的进程池
    po = Pool(5)
    # 往进程池中添加20个任务
    for i in range(20):
        po.apply_async(work, (f'liang{i}', q))

    # close:关闭进程池(进程池停止接收任务)
    po.close()
    # 主进程等待进程池中的任务结束再往下执行
    po.join()

if __name__ == '__main__':
    main()

相关文章

网友评论

      本文标题:python多任务--进程池Pool

      本文链接:https://www.haomeiwen.com/subject/yamnpktx.html