美文网首页PythonPython学习
Python并发编程——多进程

Python并发编程——多进程

作者: xiaogp | 来源:发表于2021-05-16 17:38 被阅读0次

摘要:Python多进程

进程是操作系统中具体的处理任务,米一个进程都会有自己独立的内存空间,它是线程的载体。当一个程序启动时,会默认启动一个进程,将该进程装载到内存,同时在该进程中还会默认启动一个线程,来执行本进程中的内容。


创建多进程程序

在Python中进程的相关实现被封装在模块multiprocessing中,进程的创建方法和线程一样有两种

  • 直接通过multiprocessing中的进程类Process创建
  • 继承multiprocessing的Process类,重写run方法

类似于线程创建,对Process类实例化时直接将进程的处理函数传入即可,通过target来接收处理函数,创建好进程后通过实例化对象的start方法将其启动

(1)使用类实例化方法创建进程

定义进程处理函数 run_proc 。在实例化 Process 时 ,将 run_proc 传入 。接着使用实例化对象
start 方法启动进程,然后调用join代表该子进程结束之后主进程才能结束

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a", ))
    p2 = Process(target=run_proc, args=("b", ))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

输出如下,课件主进程和子进程的进程号pid都不一样,在调用join之后主进程阻塞直到所有子进程结束

main process: 12611
a start, pid:12612
b start, pid:12613
a end, pid:12612
b end, pid:12613
main process end...
(2)使用继承类的方式创建进程

实现自定义进程类使其继承于系统进程类 Process 。重写类中的 run 方法,

import time
import os
from multiprocessing import Process


def run_proc(sid):
    print("{} start, pid:{}".format(sid, os.getpid()))
    time.sleep(3)
    print("{} end, pid:{}".format(sid, os.getpid()))


class MyProcess(Process):
    def __init__(self, sid):
        super().__init__()
        self.sid = sid

    def run(self):
        run_proc(self.sid)


if __name__ == '__main__':
    p1 = Process(target=run_proc, args=("a",))
    p2 = Process(target=run_proc, args=("b",))
    print("main process: {}".format(os.getpid()))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("main process end...")

输出如下,和直接实例化Process类输出效果一致

main process: 14859
a start, pid:14860
b start, pid:14861
a end, pid:14860
b end, pid:14861
main process end...

创建进程池

当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候就要编写自己的线程池/进程池,在Python只有两个模块实现进程池,分别是concurrent.futuresmultiprocessing.Pool

(1)multiprocessing.Pool

传递给Pool一个参数设置进程池内的最大进程数,Pool有多个方法,主要是applyapply_asyncmapmap_asyncstarmapstarmap_async,区别如下

  • apply:单次同步执行,每次执行传入一个执行函数的参数,并且执行完毕才能执行下一个进程,如果执行函数有返回值返回最后一个执行完进程的值
  • apply_async:单次启动一个任务,传入一个执行函数的参数,异步执行,启动后不等这个进程结束又开始执行新任务,如果执行函数有返回值返回最后一个执行完进程的对象,需要调用get获得结果值
  • map:执行一个任务列表,传入执行函数的参数一个参数列表,同步执行,即主进程阻塞直到任务列表中的任务全部执行完毕,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致
  • map_async:执行一个任务列表,不需要等待任务列表中的任务执行完毕,主进程可以继续往下执行,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致
  • starmap:和map执行类似,区别是map对执行函数只能传入一个参数,而startmap可以传入多个参数,如果执行函数有返回值,返回一个结果列表,顺序和输入列表的元素顺序一致
  • starmap_async:和map_async执行类似,区别是map_async对执行函数只能传入一个参数,而starmap_async可以传入多个参数,如果执行函数有返回值,返回一个结果对象,需要调用get方法得到输出值,顺序和输入列表的元素顺序一致

除此之外,在执行完毕多进程任务后,调用close关闭Process对象,释放与之关联的所有资源,调用join阻塞主进程,直到调用join的子进程执行完毕再退出

下面用代码测试说明

  1. apply
import time
import datetime
import os
from multiprocessing import Pool


def run_proc(sid):
    time.sleep(3)
    print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply(run_proc, i)
    pool.close()
    pool.join()

输出如下,可见apply每次只输入一个执行函数的参数,即每次执行一次,多个参数组成的任务列表只能一个接着一个执行

a end, pid:25260, time:2021-05-16 18:15:08
b end, pid:25261, time:2021-05-16 18:15:11
c end, pid:25262, time:2021-05-16 18:15:14
d end, pid:25263, time:2021-05-16 18:15:17

2.apply_async
直接修改Pool的apply方法改为apply_assync

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    for i in jobs:
        pool.apply_async(run_proc, i)
    print("main process")
    pool.close()
    pool.join()

输出如下,可见达到了并行效果,进程之间执行任务互补阻塞,主进程也没有被阻塞,main process被提前打印了出来

main process
a end, pid:25721, time:2021-05-16 18:17:33
b end, pid:25722, time:2021-05-16 18:17:33
c end, pid:25723, time:2021-05-16 18:17:33
d end, pid:25724, time:2021-05-16 18:17:33

3.map
修改Pool的方法为map

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

输出如下,可见map达到了并行的效果,但是主进程被阻塞,即map执行完任务列表后,主进程才执行

a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43

b end, pid:26940, time:2021-05-16 18:21:43
c end, pid:26941, time:2021-05-16 18:21:43
main process

4.map_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = ["a", "b", "c", "d"]
    res = pool.map_async(run_proc, jobs)
    print("main process...")
    pool.close()
    pool.join()
    print(res.get())

输出如下,可见效果了map类似,区别是主进程没有被阻塞,main process被提前打印了出来

main process...
c end, pid:21401, time:2021-05-16 21:41:54
b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54

d end, pid:21402, time:2021-05-16 21:41:54
['a', 'b', 'c', 'd']

5.starmap
将执行函数的入参改为两个,Pool函数改为starmap,并且传入的任务列表的每个参数也改为两个

def run_proc(sid, other):
    time.sleep(3)
    print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
    return str(sid) + str(other)


if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.starmap(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

输入如下,执行函数正确的接收了两个参数,主进程阻塞

b end, other:r, pid:27642, time:2021-05-16 18:24:52
a end, other:b, pid:27641, time:2021-05-16 18:24:52
c end, other:d, pid:27643, time:2021-05-16 18:24:52
d end, other:a, pid:27644, time:2021-05-16 18:24:52
main process

看一个map是否支持两个参数

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    pool.map(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()

直接报错,执行函数缺少参数

TypeError: run_proc() missing 1 required positional argument: 'other'

6.starmap_async

if __name__ == '__main__':
    pool = Pool(4)
    jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
    res = pool.starmap_async(run_proc, jobs)
    print("main process")
    pool.close()
    pool.join()
    print(type(res.get()))

输出如下,可见和starmap相比,主进程没有被阻塞,其他一样

main process
a end, other:b, pid:17123, time:2021-05-16 21:28:30
b end, other:r, pid:17124, time:2021-05-16 21:28:30
d end, other:a, pid:17126, time:2021-05-16 21:28:30
c end, other:d, pid:17125, time:2021-05-16 21:28:30
<class 'list'>

多线程和多进程的区别

多进程和多线程都可以用并行机制来提升系统的运行效率,二者的区别在于运行时所占的内存分布不同

  • 多线程是共用一套内存的代码块区间
  • 多进程是各用一套独立的内存区间

在大型计算机集群系统中,都会将多进程程序分布运行在不同的计算机上协同工作,而每一台机器上的进程内部,又会由多个线程来并行工作

相关文章

网友评论

    本文标题:Python并发编程——多进程

    本文链接:https://www.haomeiwen.com/subject/iqdrjltx.html