美文网首页
进程,线程,协程与python的实现

进程,线程,协程与python的实现

作者: 睡不醒的大橘 | 来源:发表于2020-07-12 17:57 被阅读0次

    进程

    进程
    • 进程是程序执行的过程,包括了动态创建、调度和消亡的整个过程,进程是程序资源管理的最小单位。
    • 进程管理的资源包括:CPU(寄存器),IO, 内存,网络资源等
    进程地址空间
    • 当创建一个进程时,操作系统会为该进程分配一个 4GB 大小的虚拟进程地址空间
    • 操作系统采用虚拟内存技术,把进程虚拟地址空间划分成用户空间内核空间。每个进程的用户地址空间都是独立的,一般而言是不能互相访问的,但内核空间是每个进程都共享的,所以进程之间要通信必须通过内核。
    • 用户空间按照访问属性一致的地址空间存放在一起的原则,划分成 5个不同的内存区域,总计 3G 的容量:
    内存区域 存储内容
    局部变量,函数参数,函数的返回值
    动态分配的内存
    BSS段 未初始化或初值为0的全局变量和静态局部变量
    数据段 已初始化且初值非0的全局变量和静态局部变量
    代码段 可执行文件的操作指令,可执行程序在内存中的镜像(只读)
    • 在 x86 32 位系统里,Linux 内核地址空间是指虚拟地址从 0xC0000000 开始到 0xFFFFFFFF 为止的高端内存地址空间,总计 1G 的容量, 包括了内核镜像、物理页面表、驱动程序等运行在内核空间 。
    进程间通信
    • 当一个进程创建时可以共享程序代码的页,但它们各自有独立的数据拷贝(堆和栈),因此子进程对内存单元的修改对父进程是不可见的。
    • 进程间的通信方式包括管道消息队列共享内存信号量信号Socket
    管道
    • 所谓的管道,就是内核里面的一串缓存。从管道的一端写入的数据,实际上是缓存在内核中的,另一端读取,也就是从内核中读取这段数据。另外,管道传输的数据是无格式的流且大小受限。匿名管道的通信的方式是单向的,如果要双向通信,需要创建两个管道。管道分为匿名管道命名管道

    • 匿名管道:没有名字标识,是只存在于内存的特殊文件,不存在于文件系统中。匿名管道只能用于存在父子关系的进程间通信。(创建的子进程会复制父进程的文件描述符,这样就做到了两个进程各有两个「 fd[0] 与 fd[1]」,两个进程就可以通过各自的 fd 写入和读取同一个管道文件实现跨进程通信了)它的生命周期随着进程创建而建立,随着进程终止而消失。shell 命令中的 “|”

    • 命名管道:需要在文件系统创建一个类型为 p 的设备文件,不相干的进程从而可以通过这个设备文件进行通信。

    • 管道这种通信方式效率低,不适合进程间频繁地交换数据

    消息队列
    • 消息队列是保存在内核中的消息链表,在发送数据时,会分成一个一个独立的数据单元,也就是消息体(数据块),消息体是用户自定义的数据类型,消息的发送方和接收方要约定好消息体的数据类型,所以每个消息体都是固定大小的存储块,不像管道是无格式的字节流数据。如果进程从消息队列中读取了消息体,内核就会把这个消息体删除。
    • 消息队列不适合比较大数据的传输,因为在内核中每个消息体都有一个最大长度的限制,同时所有队列所包含的全部消息体的总长度也是有上限。
    • 消息队列通信过程中,存在用户态与内核态之间的数据拷贝开销。因为进程写入数据到内核中的消息队列时,会发生从用户态拷贝数据到内核态的过程,同理另一进程读取内核中的消息数据时,会发生从内核态拷贝数据到用户态的过程。
    共享内存
    • 现代操作系统,对于内存管理,采用的是虚拟内存技术,也就是每个进程都有自己独立的虚拟内存空间,不同进程的虚拟内存映射到不同的物理内存中。所以,即使进程 A 和 进程 B 的虚拟地址是一样的,其实访问的是不同的物理内存地址,对于数据的增删查改互不影响。

    • 共享内存的机制,就是拿出一块虚拟地址空间来,映射到相同的物理内存中。这样这个进程写入的东西,另外一个进程马上就能看到了,都不需要拷贝来拷贝去,传来传去,大大提高了进程间通信的速度。

    信号量
    • 用了共享内存通信方式,带来新的问题,那就是如果多个进程同时修改同一个共享内存,很有可能就冲突了。为了防止多进程竞争共享资源,而造成的数据错乱,信号量使得共享的资源,在任意时刻只能被一个进程访问。
    • 信号量是一个整型的计数器,表示的是资源个数,其值可以通过两个原子操作来控制,分别是 P 操作和 V 操作。主要用于实现进程间的互斥(初始化信号量为1)与同步(初始化信号量为0),而不是用于缓存进程间通信的数据。
    信号
    • 在 Linux 操作系统中, 为了响应各种各样的事件,提供了几十种信号,分别代表不同的意义。我们可以通过 kill -l 命令,查看所有的信号。
    • 运行在 shell 终端的进程,我们可以通过键盘输入某些组合键的时候,给进程发送信号。例如:
    Ctrl+C 产生 SIGINT 信号,表示终止该进程;
    Ctrl+Z 产生 SIGTSTP 信号,表示停止该进程,但还未结束;
    kill -9 1050 ,表示给 PID 为 1050 的进程发送 SIGKILL 信号,用来立即结束该进程
    
    Socket
    • 前面提到的管道、消息队列、共享内存、信号量和信号都是在同一台主机上进行进程间通信,那要想跨网络与不同主机上的进程之间通信,就需要 Socket 通信了。。
    Python实现
    1. multiprocessing
    from multiprocessing import Process
    import os, time
    
    # 子进程要执行的代码
    def run_proc(name):
        time.sleep(2)
        print('Run child process %s (%s)...' % (name, os.getpid()))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Process(target=run_proc, args=('test',))
        print('Child process will start.')
        p.start()
        p.join()
        print('Child process end.')
    
    • join所完成的工作就是阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程
    1. multiprocessing.Pool 进程池
    from multiprocessing import Pool
    import os, time, random
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(2)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Pool(2)
        for i in range(3):
            p.apply_async(long_time_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    
    Parent process 5584.
    Waiting for all subprocesses done...
    Run task 0 (12836)...
    Run task 1 (5916)...
    Task 0 runs 2.00 seconds.
    Run task 2 (12836)...
    Task 1 runs 2.00 seconds.
    Task 2 runs 2.00 seconds.
    All subprocesses done.
    
    • Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
    1. 进程间通信
    • 基于共享内存:进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(),列表)。

    • 基于管道:包括 multiprocessing.Pipe(),multiprocessing.Queue()。

    from multiprocessing import Process, Queue
    import os, time
    
    # 写数据进程执行的代码:
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A', 'B', 'C']:
            print('Put %s to queue.' % value)
            q.put(value)
            time.sleep(2)
    
    # 读数据进程执行的代码:
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.get(True)
            print('Get %s from queue.' % value)
    
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程:
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
        # 启动子进程pw,写入:
        pw.start()
        # 启动子进程pr,读取:
        pr.start()
        # 等待pw结束:
        pw.join()
        # pr进程里是死循环,无法等待其结束,只能强行终止:
        pr.terminate()
    
    Process to read: 9504
    Process to write: 10672
    Put A to queue.
    Get A from queue.
    Put B to queue.
    Get B from queue.
    Put C to queue.
    Get C from queue.
    
    • 一个子进程向Queue中写数据,另外一个进程从Queue中取数据,当一个Queue为空的用get取数据会进程会被阻塞。

    线程

    线程
    • 线程是操作操作系统能够进行运算调度的最小单位。线程被包含在进程之中,是进程中的实际运作单位,一个进程内可以包含多个线程,线程是资源调度的最小单位。
    线程的内存模型
    • 每个线程独立的线程上下文:一个唯一的整数线程ID,栈和栈指针,程序计数器,通用目的寄存器和条件码
    • 和其他线程共享的进程上下文的剩余部分:整个用户虚拟地址空间,包括只读代码段,读/写数据段,堆以及所有的共享库代码和数据区域,也共享所有打开文件的集合
    线程的状态
    • 线程的状态分为:
    1. 可运行 (runnable):线程被创建之后,调用Start()函数就到了这个状态。
    2. 运行 (running):start()函数之后,CPU切换到了这个线程开始执行里面的Run方法就称为运行状态。
    3. 阻塞 (blocked):阻塞状态是指线程因为某种原因放弃了cpu执行权,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu 执行权 转到运行(running)状态。
    • 根据程序分别处于 Running 以及 IO Blocked 两种状态的时间占比,可分为两种:
    1. 计算密集型 ,程序执行时大部分时间处于 Running 状态;
    2. IO密集型 ,程序执行时大部分时间处于 IO Blocked 状态;
    线程的调度
    • 线程分类:
    1. 内核级线程:

    内核线程建立和销毁都是由操作系统负责、通过系统调用完成,内核维护进程及线程的上下文信息以及线程切换。

    程序一般不会直接使用内核线程,而是使用内核线程的一种高级接口-轻量级进程(Light-weight Process简称LWP ,是一种由内核支持的用户线程,每一个轻量级进程都与一个特定的内核线程关联。)。

    优势:内核级线级能参与全局的多核处理器资源分配,充分利用多核 CPU 优势。

    局限性:需要系统调度,系统调度代价大,需要在用户态和内核态来回切换;内核进程数量有限。

    1. 用户级线程

    用户线程的创建、调度、同步和销毁全由用户空间的库函数完成,不需要内核的参与。内核的调度对象是进程本身,内核并不知道用户线程的存在。

    优势:性能极强。用户线程的建立,同步,销毁和调度完全在用户态中完成,操作非常快,低消耗,无需切换内核态。

    局限性:所有操作都需要自己实现,逻辑极其复杂。 用户线级线程只能参与竞争该进程的处理器资源,不能参与全局处理器资源的竞争。

    • 线程的调度最常用的两种是:
    1. 分时调度
      所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间。

    2. 抢占式调度
      优先让优先级高的线程使用 CPU,如果线程的优先级相同,那么会随机选择一个(线程随机性)。

    • Python 线程调度实现方式参考了分时调度的思路,只不过将时间片换成字节码 。当一个线程取得 GIL 全局锁并开始执行字节码时,对已执行字节码进行计数。当执行字节码达到一定数量,或者线程主动让出控制(time.sleep(), wait(), blocked IO等)时,线程主动释放 GIL 全局锁。
    Python实现
    1. 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响。而多线程中,静态变量,实例变量被线程共享,局部变量不被线程共享
    from threading import Thread
    
    num = 0
    
    def run_thread(name):
        global num
        num = num + 1
        print('{} num: {}'.format(name, num))
    
    if __name__=='__main__':
        p1 = Thread(target=run_thread, args=('thread1',))
        p2 = Thread(target=run_thread, args=('thread2',))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        
    # thread1 num: 1
    # thread2 num: 2
    
    from multiprocessing import Process
    
    num = 0
    
    def run_proc(name):
        global num
        num = num + 1
        print('{} num: {}'.format(name, num))
    
    if __name__=='__main__':
        p1 = Process(target=run_proc, args=('process1',))
        p2 = Process(target=run_proc, args=('process2',))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        
    # process1 num: 1
    # process2 num: 1
    
    1. 线程之间共享数据最大的危险在于多个线程同时改一个变量:
    from threading import Thread
    
    num = 0
    
    def change_num(name):
        global num
        num = num + 1
    
    def run_thread(name):
        for i in range(1000000):
            change_num(name)
    
    if __name__=='__main__':
        p1 = Thread(target=run_thread, args=('thread1',))
        p2 = Thread(target=run_thread, args=('thread2',))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print(num)
        
    # 1851956
    
    • num的输出值无法预料。原因是对于 num = num + 1 可以分为两步:1)计算num + n,存入该线程的临时变量中; 2)将临时变量的值赋给num。
    • 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期,我们因此也称为“线程不安全”。
    1. 可以使用互斥锁(Lock),同一时刻只允许一个线程执行操作
    from threading import Thread, Lock
    
    num = 0
    
    def change_num(name, lock):
        global num
        lock.acquire()
        num = num + 1
        lock.release()
    
    def run_thread(name, lock):
        for i in range(1000000):
            change_num(name, lock)
    
    if __name__=='__main__':
        lock = Lock()
        p1 = Thread(target=run_thread, args=('thread1', lock))
        p2 = Thread(target=run_thread, args=('thread2', lock))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print(num)
        
    # 2000000
    
    1. 由于python GIL全局锁,python多线程并不能提升计算密集型任务的效率。这种情况下可以考虑使用多进程。
    from threading import Thread
    import time
    
    def change_num(name):
        nums = [0] * 100
        for num in nums:
            for i in range(1000000):
                num = num + 1
    
    if __name__=='__main__':
        p1 = Thread(target=change_num, args=('thread1',))
        start = time.time()
        p1.start()
        p1.join()
        end = time.time()
        print('runs %0.2f seconds.' % (end - start))
        
    # runs 21.23 seconds.
    
    from threading import Thread
    import time
    
    def change_num(name):
        nums = [0] * 100
        for num in nums:
            for i in range(1000000):
                num = num + 1
    
    if __name__=='__main__':
        p1 = Thread(target=change_num, args=('thread1',))
        p2 = Thread(target=change_num, args=('thread2',))
        p3 = Thread(target=change_num, args=('thread2',))
        start = time.time()
        p1.start()
        p2.start()
        p3.start()
        p1.join()
        p2.join()
        p3.join()
        end = time.time()
        print('runs %0.2f seconds.' % (end - start))
        
    # runs 19.49 seconds.
    

    协程

    协程
    • 协程,又称微线程。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。
    • 协程的调度完全由用户控制,协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作用户空间栈,完全没有内核切换的开销。
    • 协程适用于高并发IO密集型场景
    Python实现
    1. python 2.5 中引入 yield/send 表达式用于实现协程
    def consumer():
        r = ''
        while True:
            n = yield r
            if not n:
                return
            print('Consuming %s...' % n)
            r = '200 OK'
    
    def produce(c):
        c.__next__()
        n = 0
        while n < 3:
            n = n + 1
            print('Producing %s...' % n)
            r = c.send(n)
            print('Consumer return: %s' % r)
        c.close()
    
    if __name__=='__main__':
        c = consumer()
        produce(c)
    
    Producing 1...
    Consuming 1...
    Consumer return: 200 OK
    Producing 2...
    Consuming 2...
    Consumer return: 200 OK
    Producing 3...
    Consuming 3...
    Consumer return: 200 OK
    
    • 如果一个函数中包含yield关键字,它就不是一个普通函数,而是一个生成器(generator)调用函数就是创建了一个generator对象。
    • 这个函数,在每次调用next()(python3.x中更名为next())的时候执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。
    • send() 和next() 一样,都能让生成器继续往下走一步(下次遇到yield停),但send()能传一个值,这个值作为yield表达式整体的结果
    • 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
    1. gevent是一个广泛使用的协程框架,它的一个特点是可以用同步的方法写异步应用,不需要写回调函数:
    from gevent import monkey; monkey.patch_all()
    import gevent
    import time
    
    def f(num):
        print('start: ' + str(num))
        time.sleep(3)
        print('end: ' + str(num))
    
    
    gevent.joinall([
            gevent.spawn(f, 1),
            gevent.spawn(f, 2),
            gevent.spawn(f, 3),
    ])
    
    start: 1
    start: 2
    start: 3
    end: 1
    end: 2
    end: 3
    
    • 我们之所以可以用阻塞的time.sleep()是因为gevent对其进行了monkey_patch,会将所有的socket patch 成非阻塞的,time.sleep()也就变成了gevent.sleep()。

    • gevent使用多路IO复用对文件描述符的事件监听,当处理一个socket链接时,就创建一个协程Greenlet去处理

    • 当socket遇到阻塞的时候,比如等待数据的返回或者发送,此时gevent会为这个socket的fd在epoll上添加可读或者可写事件回调。然后,通过 get_hub().switch() 切换到主协程,去干其它事情。当该socket可读或者可写时,epoll会调用上述添加的回调函数,从而切换回socket的处理协程,从上次悬挂点接着往下执行。

    • gevent的monkey patch有个需要注意的地方,它不会patch c扩展模块中的socket。比如说MySQL连接,它仍然是阻塞的。

    1. python 3.5 之后还引入async/await。

    1)

    import asyncio
    
    async def hw_async():
        print('Hello world!')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hw_async())
    loop.close()
    
    • async 是明确将函数声明为协程的关键字。这样的函数执行时会返回一个协程对象。
    • event loop是协程执行的控制点,如果希望执行协程,就需要用到它们。asyncio启动默认的event loop(asyncio.get_event_loop()), 调度并执行异步任务, 关闭event loop。loop.run_until_complete()这个函数是阻塞执行的, 直到所有的异步函数执行完毕。

    2)

    import asyncio
    import datetime
    from threading import currentThread
    
    async def hw_async(num):
        print(f"Hello world {num} begin at: {datetime.datetime.now().strftime('%S')},thread:{currentThread().name}")
        await asyncio.sleep(3)
        print(f"Hello world {num} end at: {datetime.datetime.now().strftime('%S')}")
    
    loop = asyncio.get_event_loop()
    
    tasks = [hw_async(1), hw_async(2),hw_async(3)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    
    Hello world 3 begin at: 53,thread:MainThread
    Hello world 1 begin at: 53,thread:MainThread
    Hello world 2 begin at: 53,thread:MainThread
    Hello world 3 end at: 56
    Hello world 1 end at: 56
    Hello world 2 end at: 56
    
    • 从输出可以看出,三个不同的协程函数都是在MainThread线程完成的。并且异步执行。

    • await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。且await后面的对象需要是一个awaitable,有三种主要类型: 协程, 任务 和 Future。

    • await io操作,此时,当前协程就会被挂起,时间循环转而执行其他协程。但并不是说所有协程里的await都会导致当前携程的挂起,如果跟的是我们定义的携程,则会执行这个携程,如果是asyncio模块制作者定义的固有协程,比如模拟io操作的asyncio.sleep,以及io操作,比如网络io:asyncio.open_connection这些,才会挂起当前携程。

    • loop.run_until_complete(asyncio.wait(tasks))运行时,会首先将tasks列表里的Coroutines先转换为future

    什么时候使用进程,线程和协程

    • I/O密集型:适合协程
    • 计算密集型:适合多进程 (但进程数量存在限制)
    • I/O密集型 + 计算密集型:多进程 + 协程

    相关文章

      网友评论

          本文标题:进程,线程,协程与python的实现

          本文链接:https://www.haomeiwen.com/subject/xzkncktx.html