美文网首页
19--Python 并发与多线程

19--Python 并发与多线程

作者: Roger田翔 | 来源:发表于2019-08-09 19:44 被阅读0次
    @Author : Roger TX (425144880@qq.com)
    @Link : https://github.com/paotong999

    一、线程和进程

    1.1 什么是线程

    • 当一个程序进入内存运行时,即变成一个进程。
    • 进程是处于运行过程中的程序,并且具有一定的独立功能。
    • 进程是系统进行资源分配和调度的一个独立单位。

    1.2 进程的三个特征

    • 独立性:进程是系统中独立存在的实体,它可以拥有自己的独立的资源,每一个进程都拥有自己的私有的地址空间。在没有经过进程本身允许的情况下,一个用户进程不可以直接访问其他进程的地址空间。
    • 动态性:进程与程序的区别在于,程序只是一个静态的指令集合,而进程是一个正在系统中活动的指令集合。在进程中加入了时间的概念。进程具有自己的生命周期和各种不同的状态,在程序中是没有这些概念的。
    • 并发性:多个进程可以在单个处理器上并发执行,多个进程之间不会互相影响。

    1.3 什么是线程

    在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。

    • 多线程则扩展了多进程的概念,使得同一个进程可以同时并发处理多个任务。
    • 线程(Thread)也被称作轻量级进程(Lightweight Process),线程是进程的执行单元。就像进程在操作系统中的地位一样,线程在程序中是独立的、并发的执行流。
    • 线程是进程的组成部分,线程可以拥有自己的堆栈、自己的程序计数器和自己的局部变量,但不拥有系统资源,它与父进程的其他线程共享该进程所拥有的全部资源。
    • 线程的运行是抢占式的,一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发运行。
    • 线程的调度和管理由进程本身负责完成,在一个进程中可以包含多个线程,但至少要包含一个主线程。

    归纳来说:

    • 操作系统可以同时执行多个任务,每一个任务就是一个进程,进程可以同时执行多个任务,每一个任务就是一个线程。
    • 对于一个 CPU 而言,在某个时间点它只能执行一个程序。也就是说单 CPU 只能运行一个进程,CPU 只是不断地在这些进程之间轮换执行。

    二、Python 进程

    2.1 创建Python进程(UNIX)

    Python 的 os 模块提供了一个 fork() 方法,该方法可以 fork 出来一个子进程。

    os.fork() 方法在 Windows 系统上无效,只在 UNIX 系统上有效。

    fork() 方法不需要参数,它有一个返回值,该返回值表明是哪个进程在执行:

    • 如果 fork() 方法返回 0,则表明是 fork 出来的子进程在执行。
    • 如果 fork() 方法返回非 0,则表明是父进程在执行,该方法返回 fork() 出来的子进程的进程 ID。

    fork() 方法会启动两个进程(一个是父进程,一个是 fork 出来的子进程)来执行从 os.fork() 开始的所有代码。

    import os
    
    print('父进程(%s)开始执行' % os.getpid())
    # 开始fork一个子进程
    # 从这行代码开始,下面代码都会被两个进程执行
    pid = os.fork()
    print('进程进入:%s' % os.getpid())
    # 如果pid为0,表明子进程
    if pid == 0:
        print('子进程,其ID为 (%s), 父进程ID为 (%s)' % (os.getpid(), os.getppid()))
    else:
        print('我 (%s) 创建的子进程ID为 (%s).' % (os.getpid(), pid))
    print('进程结束:%s' % os.getpid())
    

    结果如下:

    父进程(1795)开始执行
    进程进入:1795
    我(1795)创建的子进程ID 为(1796).
    进程结束:1795
    进程进入:1796
    子进程,其ID 为(1796),父进程ID 为(1795)
    进程结束: 1796
    

    2.2 创建Python进程(WINDOWS)

    Python 在 multiprocessing 模块下提供了 Process 来创建新进程。

    使用 Process 创建新进程也有两种方式:

    • 以指定函数作为 target,创建 Process 对象即可创建新进程。
    • 继承 Process 类,并重写它的 run() 方法来创建进程类,程序创建 Process 子类的实例作为进程。

    Process 类也有如下类似的方法和属性:

    • run():重写该方法可实现进程的执行体。
    • start():该方法用于启动进程。
    • join([timeout]):该方法类似于线程的 join() 方法,当前进程必须等待被 join 的进程执行完成才能向下执行。
    • name:该属性用于设置或访问进程的名字。
    • is_alive():判断进程是否还活着。
    • daemon:该属性用于判断或设置进程的后台状态。
    • pid:返回进程的 ID。
    • authkey:返回进程的授权 key。
    • terminate():中断该进程。

    指定函数作为target创建新进程

    import multiprocessing
    import os
    
    # 定义一个普通的action函数,该函数准备作为进程执行体
    def action(max):
        for i in range(max):
            print("(%s)子进程(父进程:(%s)):%d" %
                (os.getpid(), os.getppid(), i))
    if __name__ == '__main__':
        # 下面是主程序(也就是主进程)
        for i in range(100):
            print("(%s)主进程: %d" % (os.getpid(), i))
            if i == 20:
                # 创建并启动第一个进程
                mp1 = multiprocessing.Process(target=action,args=(100,))
                mp1.start()
                # 创建并启动第一个进程
                mp2 = multiprocessing.Process(target=action,args=(100,))
                mp2.start()
                mp2.join()
        print('主进程执行完成!')
    

    继承Process类创建子进程

    • 定义继承 Process 的子类,重写其 run() 方法准备作为进程执行体。
    • 创建 Process 子类的实例。
    • 调用 Process 子类的实例的 start() 方法来启动进程。
    import multiprocessing
    import os
    class MyProcess(multiprocessing.Process):
        def __init__(self, max):
            self.max = max
            super().__init__()
        # 重写run()方法作为进程执行体
        def run(self):
            for i in range(self.max):
                print("(%s)子进程(父进程:(%s)):%d" %
                    (os.getpid(), os.getppid(), i))
    if __name__ == '__main__':
        # 下面是主程序(也就是主进程)
        for i in range(100):
            print("(%s)主进程: %d" % (os.getpid(), i))
            if i == 20:
                # 创建并启动第一个进程
                mp1 = MyProcess(100)
                mp1.start()
                # 创建并启动第一个进程
                mp2 = MyProcess(100)
                mp2.start()
                mp2.join()
        print('主进程执行完成!')
    

    2.3 Python使用进程池管理进程

    如果程序需要启动多个进程,可以使用进程池来管理进程。程序可以通过 multiprocessing 模块的 Pool() 函数创建进程池。
    进程池具有如下常用方法:

    • apply(func[, args[, kwds]]):将 func 函数提交给进程池处理。其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数。该方法会被阻塞直到 func 函数执行完成。
    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):这是 apply() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。
    • map(func, iterable[, chunksize]):类似于 Python 的 map() 全局函数,只不过此处使用新进程对 iterable 的每一个元素执行 func 函数。
    • map_async(func, iterable[, chunksize[, callback[, error_callback]]]):这是 map() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。
    • imap(func, iterable[, chunksize]):这是 map() 方法的延迟版本。
    • imap_unordered(func, iterable[, chunksize]):功能类似于 imap() 方法,但该方法不能保证所生成的结果(包含多个元素)与原 iterable 中的元素顺序一致。
    • starmap(func, iterable[,chunksize]):功能类似于 map() 方法,但该方法要求 iterable 的元素也是 iterable 对象,程序会将每一个元素解包之后作为 func 函数的参数。
    • close():关闭进程池。在调用该方法之后,该进程池不能再接收新任务,它会把当前进程池中的所有任务执行完成后再关闭自己。
    • terminate():立即中止进程池。
    • join():等待所有进程完成。
    1. 如果程序只是想将任务提交给进程池执行,则可调用 apply() 或 apply_async() 方法
    2. 如果程序需要使用指定函数将 iterable 转换成其他 iterable,则可使用 map() 或 imap() 方法

    使用 apply_async() 方法启动进程:

    import multiprocessing
    import time
    import os
    
    def action(name='default'):
        print('(%s)进程正在执行,参数为: %s' % (os.getpid(), name))
        time.sleep(3)
    if __name__ == '__main__':
        # 创建包含4条进程的进程池
        pool = multiprocessing.Pool(processes=4)
        # 将action分3次提交给进程池
        pool.apply_async(action)
        pool.apply_async(action, args=('位置参数', ))
        pool.apply_async(action, kwds={'name': '关键字参数'})
        pool.close()
        pool.join()
    

    使用 map() 方法来启动进程:

    import multiprocessing
    import time
    import os
    
    def action(name='default'):
        print('(%s)进程正在执行,参数为: %s' % (os.getpid(), name))
        time.sleep(3)
    if __name__ == '__main__':
        # 创建包含4条进程的进程池
        pool = multiprocessing.Pool(processes=4)
        # 将action分3次提交给进程池
        pool.apply_async(action)
        pool.apply_async(action, args=('位置参数', ))
        pool.apply_async(action, kwds={'name': '关键字参数'})
        pool.close()
        pool.join()
    

    2.4 Python进程间通信

    Python 为进程通信提供了两种机制:

    • Queue:一个进程向 Queue 中放入数据,另一个进程从 Queue 中读取数据。
    • Pipe:Pipe 代表连接两个进程的管道。程序在调用 Pipe() 函数时会产生两个连接端,分别交给通信的两个进程,接下来进程既可从该连接端读取数据,也可向该连接端写入数据。

    使用Queue实现进程间通信

    • Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
    • Queue.empty():判断队列是否为空。
    • Queue.full():判断队列是否已满。
    • Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
    • Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
    • Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
    • Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
    import multiprocessing
    def f(q):
        print('(%s) 进程开始放入数据...' % multiprocessing.current_process().pid)
        q.put('Python')
    if __name__ == '__main__':
        # 创建进程通信的Queue
        q = multiprocessing.Queue()
        # 创建子进程
        p = multiprocessing.Process(target=f, args=(q,))
        # 启动子进程
        p.start()
        print('(%s) 进程开始取出数据...' % multiprocessing.current_process().pid)
        # 取出数据
        print(q.get())  # Python
        p.join()
    

    使用Pipe实现进程间通信

    • send(obj):发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可 picklable 的(Python 的序列化机制),如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常。
    • recv():接收另一端通过 send() 方法发送过来的数据。
    • fileno():关于连接所使用的文件描述器。
    • close():关闭连接。
    • poll([timeout]):返回连接中是否还有数据可以读取。
    • send_bytes(buffer[, offset[, size]]):发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size 的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收。
    • recv_bytes([maxlength]):接收通过 send_bytes() 方法发迭的数据,maxlength 指定最多接收的字节数。该方法返回接收到的字节数据。
    • recv_bytes_into(buffer[, offset]):功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中。
    import multiprocessing
    
    def f(conn):
        print('(%s) 进程开始发送数据...' % multiprocessing.current_process().pid)
        # 使用conn发送数据
        conn.send('Python')
    if __name__ == '__main__':
        # 创建Pipe,该函数返回两个PipeConnection对象
        parent_conn, child_conn = multiprocessing.Pipe()
        # 创建子进程
        p = multiprocessing.Process(target=f, args=(child_conn, ))
        # 启动子进程
        p.start()
        print('(%s) 进程开始接收数据...' % multiprocessing.current_process().pid)
        # 通过conn读取数据
        print(parent_conn.recv())  # Python
        p.join()
    

    三、Python 线程

    3.1 创建Python线程

    Python 提供了 _thread 和 threading 两个模块来支持多线程,其中 _thread 提供低级别的、原始的线程支持,以及一个简单的锁,而 threading 模块则提供了功能丰富的多线程支持。

    Python 主要通过两种方式来创建线程:

    • 使用 threading 模块的 Thread 类的构造器创建线程。
    • 继承 threading 模块的 Thread 类创建线程类。

    3.2 调用 Thread 类的构造器创建线程

    调用 Thread 类的构造器创建线程,直接调用 threading.Thread 类的构造器创建线程:

    __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)

    上面的构造器涉及如下几个参数:

    • group:指定该线程所属的线程组。目前该参数还未实现,因此它只能设为 None。
    • target:指定该线程要调度的目标方法。
    • args:指定一个元组,以位置参数的形式为 target 指定的函数传入参数。元组的第一个元素传给 target 函数的第一个参数,元组的第二个元素传给 target 函数的第二个参数……依此类推。
    • kwargs:指定一个字典,以关键字参数的形式为 target 指定的函数传入参数。
    • daemon:指定所构建的线程是否为后台线程。
    import threading
    
    # 定义一个普通的action函数,该函数准备作为线程执行体
    def action(max):
        for i in range(max):
            # 调用threading模块current_thread()函数获取当前线程
            # 线程对象的getName()方法获取当前线程的名字
            print(threading.current_thread().getName() +  " " + str(i))
    # 下面是主程序(也就是主线程的执行体)
    for i in range(100):
        # 调用threading模块current_thread()函数获取当前线程
        print(threading.current_thread().getName() +  " " + str(i))
        if i == 20:
            # 创建并启动第一个线程
            t1 =threading.Thread(target=action,args=(100,))
            t1.start()
            # 创建并启动第二个线程
            t2 =threading.Thread(target=action,args=(100,))
            t2.start()
    print('主线程执行完成!')
    

    从运行结果中可以发现:

    程序中共包含三个线程,这三个线程的执行没有先后顺序,它们以并发方式执行,Thread-1 执行一段时间,然后可能 Thread-2 或 MainThread 获得 CPU 执行一段时间,接下来又换其他线程执行,这就是典型的线程并发执行,CPU 以快速轮换的方式在多个线程之间切换

    上面程序还用到了如下函数和方法:

    threading.current_thread():它是 threading 模块的函数,该函数总是返回当前正在执行的线程对象。
    getName():它是 Thread 类的实例方法,该方法返回调用它的线程名字。
    setName():它是 Thread 类的实例方法,该方法为线程设置名字。

    3.3 继承 Thread 类创建线程类

    通过继承 Thread 类来创建并启动线程的步骤如下:

    • 定义 Thread 类的子类,并重写该类的 run() 方法。run() 方法的方法体就代表了线程需要完成的任务,因此把 run() 方法称为线程执行体。
    • 创建 Thread 子类的实例,即创建线程对象。
    • 调用线程对象的 start() 方法来启动线程。
    import threading
    
    # 通过继承threading.Thread类来创建线程类
    class FkThread(threading.Thread):
        def __init__(self): 
            threading.Thread.__init__(self)
            self.i = 0
        # 重写run()方法作为线程执行体
        def run(self): 
            while self.i < 100:
                # 调用threading模块current_thread()函数获取当前线程
                # 线程对象的getName()方法获取当前线程的名字
                print(threading.current_thread().getName() +  " " + str(self.i))
                self.i += 1
    # 下面是主程序(也就是主线程的执行体)
    for i in range(100):
        # 调用threading模块current_thread()函数获取当前线程
        print(threading.current_thread().getName() +  " " + str(i))
        if i == 20:
            # 创建并启动第一个线程
            ft1 = FkThread()
            ft1.start()
            # 创建并启动第二个线程
            ft2 = FkThread()
            ft2.start()
    print('主线程执行完成!')
    

    3.4 线程锁

    多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

    import time, threading
    
    # 假定这是你的银行存款:
    balance = 0
    
    def change_it(n):
        # 先存后取,结果应该为0:
        global balance
        balance = balance + n
        balance = balance - n
    
    def run_thread(n):
        for i in range(100000):
            change_it(n)
    
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
    

    t1和t2是交替运行的,当操作系统以下面的顺序执行t1、t2:

    初始值 balance = 0
    
    t1: x1 = balance + 5  # x1 = 0 + 5 = 5
    
    t2: x2 = balance + 8  # x2 = 0 + 8 = 8
    t2: balance = x2      # balance = 8
    
    t1: balance = x1      # balance = 5
    t1: x1 = balance - 5  # x1 = 5 - 5 = 0
    t1: balance = x1      # balance = 0
    
    t2: x2 = balance - 8  # x2 = 0 - 8 = -8
    t2: balance = x2   # balance = -8
    
    结果 balance = -8
    

    要确保balance计算正确,就要给change_it()上线程锁

    def run_thread(n):
        for i in range(100000):
            # 先要获取锁:
            lock = threading.Lock()
            lock.acquire()
            try:
                # 放心地改吧:
                change_it(n)
            finally:
                # 改完了一定要释放锁:
                lock.release()
    

    如果使用线程锁

    • 创建一个锁就是通过threading.Lock()来实现。
    • 当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
    • 获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。

    四、多任务运行

    4.1 Python多线程的局限

    Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

    4.2 计算密集型与IO密集型

    我们可以把任务分为计算密集型和IO密集型。

    第一种任务的类型是计算密集型,特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

    计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

    第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

    IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

    4.3 异步IO

    考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。

    现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

    对应到Python语言,单线程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。

    相关文章

      网友评论

          本文标题:19--Python 并发与多线程

          本文链接:https://www.haomeiwen.com/subject/wffycctx.html