美文网首页
python入门系列:多进程

python入门系列:多进程

作者: Java丶python攻城狮 | 来源:发表于2019-02-13 18:51 被阅读0次

    多进程和多线程的区别

    Python多线程的操作,由于有GIL锁的存在,使得其运行效率并不会很高,无法充分利用 多核cpu 的优势,只有在I/O密集形的任务逻辑中才能实现并发。

    使用多进程来编写同样消耗cpu(一般是计算)的逻辑,对于 多核cpu 来说效率会好很多。

    操作系统对进程的调度代价要比线程调度要大的多。

    多线程和多进程使用案例对比

    1.用多进程多线程两种方式来运算 斐波那契数列,这里都依赖 concurrent.futures 模块提供的线/进程池。

    import time

    from concurrent.futures import ThreadPoolExecutor

    from concurrent.futures import ProcessPoolExecutor

    from concurrent.futures import as_completed

    def fib(n):

    return 1 if n <= 2 else fib(n-1) + fib(n-2)

    if __name__ == '__main__':

    # with ProcessPoolExecutor(3) as executor:

    with ThreadPoolExecutor(3) as executor:

    all_task = [executor.submit(fib, n) for n in range(25, 35)]

    start_time = time.time()

    for future in as_completed(all_task):

    data = future.result()

    # todo

    end_time = time.time()

    print("time consuming by threads: {0}s".format(end_time-start_time))

    # print("time consuming by processes: {0}s".format(end_time-start_time))

    两种方式的运行结果对比:

    # result:

    # time consuming by threads: 4.823292016983032s

    # time consuming by processes: 3.3890748023986816s

    可以看到,对于高计算量的任务,多进程要比多线程更加高效。同时,从这个例子中还能看出,通过concurrent.futures模块使用线程池进程池的方式的接口和使用逻辑是一样的,不过在使用多进程时,对于Windows的操作平台,相关逻辑一定要放在main中,Linux不受约束。

    2.用多进程多线程两种方式来模拟 I/O密集操作,I/O操作 的特点就是 cpu 要耗费大量的时间进行等待数据,这里用sleep()进行模拟即可。

    整体的操作方式不变,修改过的逻辑如下:

    def random_sleep(n):

    time.sleep(n)

    return n

    ...

    # 8 个线程,每个休眠两秒,模拟 I/O

    with ProcessPoolExecutor(8) as executor:

    # with ThreadPoolExecutor(8) as executor:

    all_task = [executor.submit(random_sleep, 2) for i in range(30)]

    # result:

    # time consuming by threads: 8.002903699874878s

    # time consuming by processes: 8.34946894645691s

    多进程编程

    直接使用

    import time

    import multiprocessing

    def read(times):

    time.sleep(times)

    print("process reading...")

    return "read for {0}s".format(times)

    def write(times):

    time.sleep(times)

    print("process writing...")

    return "write for {0}s".format(times)

    if __name__ == '__main__':

    read_process = multiprocessing.Process(target=read, args=(1,))

    write_process = multiprocessing.Process(target=write, args=(2,))

    read_process.start()

    write_process.start()

    print("read_process id {rid}".format(rid=read_process.pid))

    print("write_process id {wid}".format(wid=write_process.pid))

    read_process.join()

    write_process.join()

    print("done")

    # result:

    # read_process id 7064

    # write_process id 836

    # process reading...

    # process writing...

    # done

    可以看出,关于多线程的逻辑和多线程的使用方式以类似的,要注意在Windows操作系统上,和进程有关的逻辑要写在if __name__ == '__main__'中。其他的一些方法请参阅 官方文档。

    使用原生进程池

    import time

    import multiprocessing

    def read(times):

    time.sleep(times)

    print("process reading...")

    return "read for {0}s".format(times)

    def write(times):

    time.sleep(times)

    print("process writing...")

    return "write for {0}s".format(times)

    if __name__ == '__main__':

    # multiprocessing.cpu_count() 获取cpu的核心数

    pool = multiprocessing.Pool(multiprocessing.cpu_count())

    read_result = pool.apply_async(read, args=(2,))

    write_result = pool.apply_async(write, args=(3,))

    # 关闭进程池,不再接受新的任务提交,否则 join() 出错

    pool.close()

    # 等待进程池中提交的所有任务完成

    pool.join()

    print(read_result.get())

    print(write_result.get())

    # result:

    # process reading...

    # process writing...

    # read for 2s

    # write for 3s

    使用imap(),所有任务顺序执行:

    pool = multiprocessing.Pool(multiprocessing.cpu_count())

    for result in pool.imap(read, [2, 1, 3]):

    print(result)

    # result:

    # process reading...

    # process reading...

    # read for 2s

    # read for 1s

    # process reading...

    # read for 3s

    使用imap_unordered(),哪个任务先完成就先返回结果:

    for result in pool.imap_unordered(read, [1, 5, 3]):

    print(result)

    # process reading...

    # read for 1s

    # process reading...

    # read for 3s

    # process reading...

    # read for 5s

    使用concurrent.futures中的ProcessPoolExecutor

    这个在多线程和多进程对比的时提到过,因为和多线程的使用方式一样,这里就不多赘述,可以参阅 官方文档 给出的例子

    进程间通信

    进程通信和线程通信有些区别,在线程通信中各种提供的锁的机制全局变量在这里不再适用,我们要选取新的工具来完成进程通信任务。

    使用multiprocessing.Queue

    使用逻辑是和多线程中的Queue是一样的,详细方法。这种通信方式不能用在通过Pool进程池创建的进程

    import multiprocessing

    import time

    def plus(queue):

    for i in range(6):

    num = queue.get() + 1

    queue.put(num)

    print(num)

    time.sleep(1)

    def subtract(queue):

    for i in range(6):

    num = queue.get() - 1

    queue.put(num)

    print(num)

    time.sleep(2)

    if __name__ == '__main__':

    queue = multiprocessing.Queue(1)

    queue.put(0)

    plus_process = multiprocessing.Process(target=plus, args=(queue,))

    subtract_process = multiprocessing.Process(target=subtract, args=(queue,))

    plus_process.start()

    subtract_process.start()

    # result:

    # 1

    # 1

    # 2

    # 2

    # 3

    # 3

    # 0

    # 1

    # 2

    # 2

    # 1

    # 0

    使用Manager()中的Queue

    Manager()会返回一个在进程间进行同步管理的一个对象,它提供了多种在进程间共享数据的形式。

    import multiprocessing

    import time

    def plus(queue):

    for i in range(6):

    num = queue.get() + 1

    queue.put(num)

    print(num)

    time.sleep(1)

    def subtract(queue):

    for i in range(6):

    num = queue.get() - 1

    queue.put(num)

    print(num)

    time.sleep(2)

    if __name__ == '__main__':

    queue = multiprocessing.Manager().Queue(1) # 创建方式有些奇特

    # queue = multiprocessing.Queue() # 这时用这个就行不通了

    pool = multiprocessing.Pool(2)

    queue.put(0)

    pool.apply_async(plus, args=(queue,))

    pool.apply_async(subtract, args=(queue,))

    pool.close()

    pool.join()

    # result:

    # 0

    # 1

    # 1

    # 2

    # 2

    # 3

    # -1

    # 0

    # 1

    # 2

    # 1

    # 0

    使用Manager()中的list()

    多个进程可以共享全局的list,因为是进程间共享,所以用锁的机制保证它的安全性。这里的Manager().Lock不是前面线程级别的Lock,它可以保证进程间的同步。

    import multiprocessing as mp

    import time

    def add_person(waiting_list, name_list, lock):

    lock.acquire()

    for name in name_list:

    waiting_list.append(name)

    time.sleep(1)

    print(waiting_list)

    lock.release()

    def get_person(waiting_list, lock):

    lock.acquire()

    if waiting_list:

    name = waiting_list.pop(0)

    print("get {0}".format(name))

    lock.release()

    if __name__ == '__main__':

    waiting_list = mp.Manager().list()

    lock = mp.Manager().Lock() # 使用 lock 限制进程对全局量的访问

    name_list = ["MetaTian", "Rity", "Anonymous"]

    add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))

    get_process = mp.Process(target=get_person, args=(waiting_list, lock))

    add_process.start()

    get_process.start()

    add_process.join()

    get_process.join()

    print(waiting_list)

    # result:

    # ['MetaTian']

    # ['MetaTian', 'Rity']

    # ['MetaTian', 'Rity', 'Anonymous']

    # get MetaTian

    # ['Rity', 'Anonymous']

    Manager()中还有更多的进程间通信的工具,可以参阅官方文档。

    使用Pipe

    Pipe只能适用于两个进程间的通信,它的性能高于Queue,Pipe()会返回两个Connection对象,使用这个对象可以在进程间进行数据的发送和接收,非常像前面讲过的socket对象。关于Connection

    import multiprocessing

    def plus(conn):

    default_num = 0

    for i in range(3):

    num = 0 if i == 0 else conn.recv()

    conn.send(num + 1)

    print("plus send: {0}".format(num+1))

    def subtract(conn):

    for i in range(3):

    num = conn.recv()

    conn.send(num-1)

    print("subtract send: {0}".format(num-1))

    if __name__ == '__main__':

    conn_plus, conn_sbtract = multiprocessing.Pipe()

    plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))

    subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))

    plus_process.start()

    subtract_process.start()

    # result:

    # plus send: 1

    # subtract send: 0

    # plus send: 1

    # subtract send: 0

    # plus send: 1

    # subtract send: 0

    send()可以连续发送数据,recv()将另一端发送的数据陆续取出,如果没有取到数据,则进入等待状态。

    注:喜欢python + qun:839383765 可以获取Python各类免费最新入门学习资料!

    相关文章

      网友评论

          本文标题:python入门系列:多进程

          本文链接:https://www.haomeiwen.com/subject/cizdeqtx.html