美文网首页python3爬虫系列
聚沙成塔--爬虫系列(十六)(让CPU引擎轰鸣起来吧)

聚沙成塔--爬虫系列(十六)(让CPU引擎轰鸣起来吧)

作者: 爱做饭的老谢 | 来源:发表于2017-11-10 10:07 被阅读8次

版权声明:本文为作者原创文章,可以随意转载,但必须在明确位置标明出处!!!

tips:本基础系列旨在以爬虫带大家入门Python语言

前面文章讲到了多线程的使用和多线程的设计,也讲了多线程在I/O密集型中表现良好,本篇文章主要讲多进程的使用,如果你的程序涉及到大量的计算那么将你的程序设计成多进程的工作模式是你值得考虑的。多进程为什么能提高程序的处理能力呢?很好理解,就像你叫十个人一起去搬砖一样,一个人可能要搬一天,十个人可能一小时就搬完了,这就是多进程的优势。

multiprocessing

multiprocessing模块就是Python的多进程模块,Python2.6版本就将启引入作为标准库的一部分。它的接口与threading模块非常相似。当然该模块也提供了进程间传输数据的多钟方式,下面会介绍到。

多进程的启动方式

该模块支持三种方式来启动一个进程,分别是spawn、fork、forkserver。

  • spawn: 父进程启动一个新的Python解释器进程。 子进程只会继承运行进程对象run()方法所需的资源。 父进程中不必要的文件描述符和句柄将不会被继承。 与使用fork或forkserver相比,使用此方法启动进程相当慢。

  • fork:父进程使用os.fork()来派生Python解释器。 子进程与父进程有同时启动。 父进程的所有资源都由子进程继承,默认unix系统上使用此种方式,windows系统不支持此种方式

  • forkserver:当程序启动并选择forkserver启动方法时,启动服务器进程。 从此刻,无论何时需要一个新的进程,父进程都连接到服务器并请求它fork一个新的进程。 fork服务器进程是单线程的,所以使用os.fork()是安全的。
    选择一个启动进程的方法可以使用set_start_method方法

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Process,Pool

multiprocessing模块提供了两种方式来构造一个进程,一种是Process进程模式的,一种是Pool进程池模式的,Process的使用方式跟threading.Thread的使用方式是几乎是一模一样的,都提供了run、start、join、is_alive等方法,Pool进程池属于任务类型的了,对于任务模型还有一个更高级的主题concurrent.futures模块,感兴趣的同学可以去了解了解。

Process的使用

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Pool的使用

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

进程之间的数据交换

跟多线程模块一样,多进程同样提供了多种在进程间的数据交换,像Queue、Pipes、Value、Array, 其中Value、Array属于共享内存方式。

  • Queue的使用方式
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
  • Pipes的使用方式
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
  • Value,Array的使用方式
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

Managers

Managers提供了一种方法来创建可以在不同进程之间共享的数据,包括在不同机器上运行的进程之间通过网络进行共享。 Managers对象控制一个服务进程中被管理的的对象。 其他进程可以通过使用代理来访问共享对象。如果你要在多进程中共享一个队列那么一定要用Managers,让其它进程通过代理的方式去访问这个队列,不然会出错哟...;Managers对象支持list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array,所以如果你要在多进程中共享消息那么尽量用Managers,举例:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

数据同步

multiprocessing模块与threading模块很相似,所以对于数据同步Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier,其用法也同threading差不多,想要了解更多的信息可以查看开发则文档。

进程池Pool

一个进程池对象,用于控制可以提交作业的工作进程池。 它支持超时和回调的异步结果,并具有并行映射实现。它提供了如下方法

  • apply(func[,args[,kwds]]):它是一个阻塞的方法,执行函数func,参数为args或者kwds直到结果返回为止
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):它是一个异步的方法,callback是一个回调函数,如果被指定了,那么当结果产生的时候该回调函数被调用,如果出错那么error_callback回调被调用
  • map(func, iterable[, chunksize]):它是一个阻塞的方法,从名字可以看出它跟内置的map函数很相似。它接收一个迭代器对象,像元组、list等。
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]]):它是一个异步的方法,参数需要一个迭代器类型的参数,callback是一个回调函数,如果被指定了,那么当结果产生的时候该回调函数被调用,如果出错那么error_callback回调被调用
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:      
        result = pool.apply(f, (10,)) 
        print(result)        

        result = pool.apply_async(f, (10,)) 
        print(result.get(timeout=1))

        print(pool.map(f, range(10)))       
     
        result = pool.map_async(f, range(10))
        
        print(result.get(timeout=1))

# 输出结果
100
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

okay,本篇对多进程介绍就到此结束了,读者一定要亲自上机验证才回有收获,一定不要有你觉得很简单它就真的简单了。实践是验证理论唯一的途径。


欢迎关注我:「爱做饭的老谢」,老谢一直在努力...

相关文章

网友评论

    本文标题:聚沙成塔--爬虫系列(十六)(让CPU引擎轰鸣起来吧)

    本文链接:https://www.haomeiwen.com/subject/ocgdmxtx.html