美文网首页python
python 多进程(一)multiprocessing.Pro

python 多进程(一)multiprocessing.Pro

作者: eeert2 | 来源:发表于2019-08-21 14:07 被阅读0次

    该文章基于 python3.7,部分功能只有python3.7能实现

    一、 进程模块multiprocessing

    多进程可以实现多个程序的并行,充分利用计算机的资源,在不同的平台/操作系统上,python实现多进程的方式不同

    在Unix/Linux 中,通过fork()调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

    Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

    import os
    
    print('Process (%s) start...' % os.getpid())
    # Only works on Unix/Linux/Mac:
    pid = os.fork()
    if pid == 0:
        print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
    else:
        print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
    

    运行结果如下:

    Process (876) start...
    I (876) just created a child process (877).
    I am child process (877) and my parent is 876.
    

    由于Windows没有fork调用,上面的代码在Windows上无法运行,我们可以使用multiprocessing模块,其封装了底层复制进程的过程,Unix 和 Windows 上都可以运行。

    根据不同的平台, multiprocessing 支持三种启动进程的方法。

    • spawn
      父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。
      可在Unix和Windows上使用。 Windows上的默认设置。

    • fork
      父进程使用 os.fork() 来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。
      只存在于Unix。Unix中的默认值。

    • forkserver
      程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。没有不必要的资源被继承。
      可在Unix平台上使用,支持通过Unix管道传递文件描述符。

    二、进程对象Process

    进程模块multiprocessing中包含与进程相关的异常、同步、通信等等相关,其中Process封装了进程对象的相关API,是一个子进程的物化实现,封装了子进程状态与管理相关功能。

    1. 如何创建一个子进程对象
    Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
    

    应始终使用关键字参数调用构造函数,而不是 None, None, p1……这样来传入参数,否则可能造成不可知错误。

    • group 应该始终是 None ;它仅用于兼容性考虑
    • target 传入一个可调用对象。它默认为 None,这里传入的是子进程的运行函数。
    • name 是进程名称,仅仅具有标识作用,并不会改变操作系统中的进程名称。
    • args 是目标调用的参数元组,也就是target调用函数的参数
    • kwargs 是目标调用的关键字参数字典,也是target调用函数的参数
    • daemon 将进程 daemon 标志设置为 TrueFalse 。如果是 None (默认值),则该标志将从创建的进程继承

    示例:创建子进程,并显示子进程和父进程的的进程ID

    from multiprocessing import Process
    import os
    
    
    def child_main(name):
        print('I am', name, 'process id:', os.getpid())
        print('parent process:', os.getppid())
    
    
    if __name__ == '__main__':
        print('main process id:', os.getpid())
        p = Process(target=child_main, args=('bob',))
        p.start()
        p.join()
    

    打印结果

    main process id: 2649
    I am bob process id: 2650
    parent process: 2649
    

    从程序运行来看

    p = Process(target=child_main, args=('bob',))
    

    创建了一个子进程,该子进程执行child_main方法,方法的参数为'bob'p为该子进程对象的物化实现,封装子进程相关的功能。

    1. Process对象的常用方法
    • start()
      启动进程,并调用run()方法。

    • run()
      表示进程活动的方法,在可以在子类中重载此方法。标准 run() 方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 argskwargs 参数中获取顺序和关键字参数。

    • join([timeout])
      如果可选参数 timeoutNone (默认值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回 None 。检查进程的 exitcode 以确定它是否终止。

    • name
      进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。

    • is_alive()
      返回进程是否还活着。
      粗略地说,从 start() 方法返回到子进程终止之前,进程对象仍处于活动状态。

    • daemon
      进程的守护标志,一个布尔值。这必须在 start() 被调用之前设置。
      当进程退出时,它会尝试终止其所有守护进程子进程。
      请注意,不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程。 另外,这些 不是 Unix守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。

    • pid
      返回进程ID。在生成该进程之前,这将是 None 。

    • exitcode
      子进程退出代码。如果进程尚未终止,这将是 None 。负值 -N 表示孩子被信号 N 终止。

    • terminate()
      终止进程。 在Unix上,这是使用 SIGTERM 信号完成的;在Windows上使用 TerminateProcess() 。 请注意,不会执行退出处理程序和finally子句等
      请注意,进程的后代进程将不会被终止 —— 它们将孤儿进程。参考进程基础

    • kill()
      terminate() 相同,但在Unix上使用 SIGKILL 信号
      3.7 新版功能.

    • close()
      关闭 Process 对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发 ValueError 。一旦 close() 成功返回, Process 对象的大多数其他方法和属性将引发 ValueError
      3.7 新版功能.

    注意 start()join()is_alive()terminate()exitcode 方法只能由创建进程对象的进程调用。

    使用实例 1,利用多进程实现timeout 函数

    在使用 爬虫 的相关技术中,有很多方法都具有 timeout 参数,也可以利用多进程实现timeout 函数,思路如下:
    将运行函数放到子进程中运行,在主进程中等待子进程执行join([timeout]),然后判断子进程的状态 is_alive(),如果为真,说明子进程还在运行,已经超过我们的限制时间,则打断子进程,并在主进程中抛出异常。
    实现如下:
    这里在函数 run_limit()中包含一个辅助函数,我们也可以将辅助函数作为一个参数,可以实现对所有函数转成timeout 函数

    from multiprocessing import Process
    import time
    
    
    def run_limit(timeout=5):
        def fun():
            i = 0
            while True:
                time.sleep(1)
                i += 1
                print(i)
    
        p = Process(target=fun, )
        p.start()
        p.join(timeout=timeout)
        if p.is_alive():
            p.terminate()
            raise TimeoutError(f'运行超时{timeout}!')
    
    
    if __name__ == '__main__':
        run_limit()
    

    注意,这里每执行该函数都会启动一个子进程,较普通函数有较大的消耗。

    使用实例 2,利用守护进程daemon实现心跳机制

    背景:

    心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制。

    在Linux系统中,计算机刚启动时只有一个进程,PID为1,名字为init(centos6系统)或者systemd(centos7系统),systemd进程通过复制自身进程启动了其它进程,后面再复制出整个计算机进程,所以进程被设计成独立的,也就是说父进程关闭了,子进程照样能正常运行,这是非常必要的,不至于其中一个进程挂了,让它的子孙后代进程都挂掉。

    但如果我们在主进程中开启一个子进程用于向远方服务器报告,这种进程间的独立就不符合我们的期望,而daemon参数可以实现主进程与子进程的绑定,主进程结束,守护进程的子进程也结束,如下

    from multiprocessing import Process
    import time
    
    
    def child_main():
        while True:
            print('i am live')
            time.sleep(3)
    
    
    if __name__ == '__main__':
        p = Process(target=child_main, daemon=True)
        p.start()
        time.sleep(10)
        print('main end')
    

    这里没有使用join()方法等待子进程,所以在运行10秒后主进程结束,
    由于参数daemon=True ,所以子进程在主进程结束后也跟着结束了。

    相关文章

      网友评论

        本文标题:python 多进程(一)multiprocessing.Pro

        本文链接:https://www.haomeiwen.com/subject/fyubjctx.html