美文网首页Python语言
Python进程和线程(13)

Python进程和线程(13)

作者: 纳米君 | 来源:发表于2018-07-24 00:05 被阅读13次

学习Python也有很长一段时间了,进程和线程这块一直没作记录。直到最近在写一个爬虫,如果不搞多进程并发执行,爬虫时间令人发指。

说到进程,必然少不了线程。系统可以有多个进程,其中一个进程中可以有多个线程。
每个进程都有一个独立的GIL(全局解释器锁),多进程可以有效的利用多核CPU,而多线程只能占用CPU的一个核,因为线程执行时,必须先获取GIL才能执行,等到释放GIL,其他线程才有执行的机会,即一个进程中,不可能存在多个线程同时执行,现在系统均是多核CPU,所以都不推荐使用多线程。

先说说进程的事。Python提供了进程包multiprocessing

1. 进程,创建进程,定义一个target即可,如下:
from multiprocessing import Process

# 定义任务
def p_task(a):
    print('process args is %s' % a)

# 定义一个进程,执行一个任务
p = Process(target=p_task, args=(1,))
# 启动进程
p.start()
# 等待进程执行完毕
p.join()
print('over...')

输出结果:

process args is 1
over...
2. 进程池

如果需要创建多个进程并复用,一般采取进程池的方式,因为创建进程和销毁进程的开销很大。
创建进程池,池的大小一般等于系统CPU核数。cpu核数可以通过multiprocessing.cpu_count() 查看。
Python提供了两种创建方式:Pool()ProcessPoolExecutor()
两种执行效率如何呢?直接给结果:Pool的效率高于ProcessPoolExecutor,测试代码很容易些,这里就不贴多余的代码了。

第一种:Pool(),进程池有两个方法:
apply(self, func, args=(), kwds={}):同步执行func,源码如下:

    def apply(self, func, args=(), kwds={}):
        '''
        Equivalent of `func(*args, **kwds)`.
        '''
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

可以看出,还是调用的异步执行方法 self.apply_async(func, args, kwds),只不过apply_async后又调用了get()方法等待结果返回。
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):异步执行func
所以异步执行任务需要返回结果时,注意get()方法的使用,避免把异步变成同步。看示例:

import time
from multiprocessing import Pool

def task(args):
    time.sleep(2)
    return args + 1

pool = Pool(3)

l = []
start = time.time()
for i in range(5):
    # 循环里面千万别调用r.get()方法,否则会等待结果返回,异步变同步
    r = pool.apply_async(task, args=(i,))
    l.append(r)

# close方法表示不再接收新任务,之前的任务依旧执行
pool.close()
pool.join()
# pool.terminate()会立即终止进程池,包括正在执行的子进程
print('over... ', time.time() - start)

第二种:ProcessPoolExecutor(),说说它的map方法和submit方法。
map方法:按任务顺序返回进程return的值
submit方法:无序执行,返回future对象,但返回的future对象是有序的,等进程执行结束,可以通过future.result()获取进程return的值

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
from concurrent.futures import ProcessPoolExecutor


def task(i):
    time.sleep(0.5)
    return i


if __name__ == '__main__':
    count = 100

    list = [i for i in range(count)]
    start = time.time()
    with ProcessPoolExecutor(max_workers=8) as pool:
        # 有序输出,直接返回结果
        print([data for data in pool.map(task, list)])

    print('map time: ', time.time() - start)

    start = time.time()
    future_list = []
    with ProcessPoolExecutor(max_workers=8) as pool:
        # 返回future对象
        future = pool.submit(task, list)
        future_list.append(future)

    print([future.result() for future in future_list][0])
    print('submit time: ', time.time() - start)

输出结果:

map time:  6.669000148773193
submit time:  0.623999834060669

很显然,有序输出效率很低。一般情况下,推荐用submit方法。

3. 进程之间通信

进程都拥有自己独立的数据,它们之间默认无法共享数据。下面介绍几种进程间通信的方式。

方法一:使用Array

from multiprocessing import Process, Array

g = []

def share_task(a):
    g.extend(a)
    for i in range(len(a)):
        a[i] = a[i] * 2

if __name__ == '__main__':
    arr = Array('i', range(10))
    p = Process(target=share_task, args=(arr,))
    p.start()
    p.join()

    print('arr: ', arr[:])
    print('g: ', g[:])

输出结果:

arr:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
g:  []

g还是为空列表,表明默认数据不共享,然而通过Array,子进程改变了主进程的数组中的元素。

方法二:使用Manager

from multiprocessing import Process, Manager

def m_task(d, l):
    d[1] = '1'
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as m:
        m_dict = m.dict()
        m_list = m.list(range(10))
        p = Process(target=m_task, args=(m_dict, m_list))
        p.start()
        p.join()

        print('m_dict: ', m_dict)
        print('m_list: ', m_list)

输出结果:

m_dict:  {1: '1', 0.25: None}
m_list:  [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

方法三: 使用Queue

from multiprocessing import Process, Queue

def producer_task(q):
    for i in range(10):
        q.put(i)
        print('producer: ', i)


def consumer_task(q):
    for i in range(10):
        v = q.get()
        print('consumer: ', v)

if __name__ == '__main__':
    q = Queue()
    p_producer = Process(target=producer_task, args=(q,))
    p_consumer = Process(target=consumer_task, args=(q,))

    p_producer.start()
    p_consumer.start()

    p_producer.join()
    p_consumer.join()

输出结果:

producer:  0
producer:  1
producer:  2
producer:  3
consumer:  0
producer:  4
consumer:  1
producer:  5
producer:  6
consumer:  2
consumer:  3
producer:  7
producer:  8
consumer:  4
producer:  9
consumer:  5
consumer:  6
consumer:  7
consumer:  8
consumer:  9

还有Pipes等通信方式,这里就不多说了。


说完进程,就来说说线程,虽说都不推荐使用多线程,但是本人测试过多进程和多线程执行效率的差距,并没感觉到有多大差别。等后面写爬虫的时候会比较它们的执行效率。

1. 线程:

多线程共享一个变量的时候,记得加锁threading.Lock(),防止多个线程同时修改变量,造成数据错误。
threading.local()能记录每个线程独有的变量。

看代码和注释:

#!usr/bin/env python
# -*- coding:utf-8 _*-
import multiprocessing
import threading

g = 0
local = threading.local()


def task():
    lock = threading.Lock()
    lock.acquire()
    try:
        global g
        print('thread %s is running...' % threading.current_thread().name)
        for i in range(100000):
            g = g + 1
            g = g - 1
    finally:
        lock.release()


def local_task(name):
    local.name = name
    print(name)
    print_local()


def print_local():
    print('thread name: %s, local var is %s' % (threading.current_thread().name, local.name))


if __name__ == '__main__':
    print('thread %s is running...' % threading.current_thread().name)
    # 创建线程
    thread1 = threading.Thread(target=task, name='SonThread1')
    thread2 = threading.Thread(target=task, name='SonThread2')
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(g)

    thread3 = threading.Thread(target=local_task, name='SonThread3', args=('Tom',))
    thread4 = threading.Thread(target=local_task, name='SonThread4', args=('Lucy',))
    thread3.start()
    thread4.start()

    print('thread %s is ended...' % threading.current_thread().name)

输出结果:

thread MainThread is running...
thread SonThread1 is running...
thread SonThread2 is running...
0
Tom
thread name: SonThread3, local var is Tom
Lucy
thread name: SonThread4, local var is Lucy
thread MainThread is ended...
2. 线程池ThreadPoolExecutor:

其使用方式和进程池ProcessPoolExecutor一模一样。示例代码如下:

def task(i):
    time.sleep(0.5)
    return i


list = [i for i in range(count)]
with ThreadPoolExecutor(max_workers=8) as pool:
    # 有序输出,直接返回结果
    print([data for data in pool.map(task, list)])

相关文章

网友评论

    本文标题:Python进程和线程(13)

    本文链接:https://www.haomeiwen.com/subject/uzzrmftx.html