美文网首页
Python 中的进程、线程、协程详解

Python 中的进程、线程、协程详解

作者: 凯撒网络研究院 | 来源:发表于2018-05-11 17:13 被阅读0次

    目录

    [toc]


    概念

    1. CPU在同一时刻只能处理一个任务,cpu在各个任务之间来回的进行切换,只是因为cpu执行速度很快,误认为是同时执行,但是python的线程是伪线程,即使是多核cpu也只是同时执行一个进程
    2. 一个进程占一块内存,每一个程序的内存是独立的
    3. 进程需要执行必须要创建一个线程
    4. 同一个进程的线程共享同一块资源

    进程:
    在进行的一个任务(一些资源的集合:),由cpu执行

    线程:
    是操作系统最小的调度单位,是一串指令的集合

    区别:

    1.进程快还是线程快?:一样快 进程是通过线程执行所以是线程同线程比较
    2.启动线程快还是进程快? 启动线程快,启动线程:要申请内存空间等 ,启动进程:直接执行指令
    3.线程共享内存空间,进程内存是独立的
    4.创建新线程更简单,创建新进程要拷贝父进程
    5.一个线程可以操作同一个进程里的其他线程,进程只能操作子进程
    

    线程

    使用场景

    1. 不适合cpu()密集型任务,适合io(数据读取写入)操作密集型任务

    创建进程

    使用方法的方式使用线程

    def run(name):
        print(name)
        time.sleep(2)
    threads=[ threading.Thread(target=run,args=(i,)) for i in range(10)] #用列表生成式 生成10个线程
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    

    使用类的方式使用线程

    
    class MyThread(threading.Thread):
        def __init__(self,n):
            super(MyThread,self).__init__()
            self.n=n
        def run(self):
            print(self.n)
    
    threads=[ MyThread(str(i)) for i in range(10)] #用列表生成式 生成10个线程
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    

    join

    主线程创建子线程后,两者并不影响,所以是并行执行,造成主线程结束后,子线程还在运行。那么我们需要主线程要等待子线程运行完后,再退出,就要使用join

    def run(name):
        for i in range(3):
            print(name,i)
            time.sleep(2)
    
    threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程
    
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    for t in threads: #join创建的10个线程
        t.join()
    print('main finished...') #join后,所有子线程执行完才会执行,join前,子线程没有执行完就执行了这句
    
    print('-----')
    print('进程个数:',threading.active_count())
    print('进程个数:',threading.activeCount())
    print('当前进程:',threading.current_thread())
    

    守护线程(后台线程)

    默认情况下,主线程退出之后,那么主线程结束后,子线程也依然会继续执行。如果希望主线程退出后,其子线程也退出而不再执行,则需要设置子线程为守护线程。用setDeamon 方法设置线程为守护线程。

    print('-------守护线程-------')
    
    def run(name):
        for i in range(3):
            print(name,i)
            time.sleep(2)
    
    threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(3)] #用列表生成式 生成10个线程
    
    for t in threads: #启动刚刚创建的10个线程
        t.setDaemon(True)
        t.start()
    
    print('main finished...')
    

    线程同步与互斥锁 LOCK

    同一时间 只有一个线程运行,即使cpu是多核的
    python 线程执行的是c语言写的原生线程,多线程对数据操作时,a线程更改数据后,b线程又再次更改了数据,两次个线程执行的结果不会叠加,只是修改,为了实现数据同步,所以加入全局解释器锁(GIL)
    不添加锁时在乌班图上运行此段代码,num的计数是会不准确的。

    线程同步

    print('-------线程同步与互斥锁-------')
    #添加锁
    lock=threading.Lock()
    num=0
    def run(name):
        lock.acquire() #获取锁
        global num
        num+=1
        lock.release() #释放锁
    threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    for t in threads: #join创建的10个线程
        t.join()
    print('num',num)
    

    互斥锁(递归锁)RLock

    多重锁的时候,也就是两层锁,程序会锁死,这时候要用RLock,程序才能正常执行

    lock=threading.RLock()
    num1,num2=0,0
    
    def run1():
        lock.acquire()  # 获取锁
        global  num1
        num1+=1
        lock.release()  # 释放锁
        return num1
    
    def run2():
        lock.acquire()  # 获取锁
        global  num2
        num2+=1
        lock.release()  # 释放锁
        return num2
    
    def run():
        lock.acquire() #获取锁
    
        run1()
        print('between run1 run2')
        run2()
    
        lock.release() #释放锁
    threads=[ threading.Thread(target=run) for i in range(10)] #用列表生成式 生成10个线程
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    for t in threads: #join创建的10个线程
        t.join()
    
    print('num1:',num1,'num2:',num2)
    

    信号量

    指定固定数量的线程一起执行(其实还是执行完一个 输出一个)

    print('-------信号量-------')
    lock=threading.BoundedSemaphore(3) #只允许三个线程同时一起执行
    def run(tag):
        lock.acquire() #获取锁
        print(tag)
        time.sleep(2)  #执行后等待两秒
        lock.release() #释放锁
    threads=[ threading.Thread(target=run,args=('t%s'%i,)) for i in range(10)] #用列表生成式 生成10个线程
    for t in threads: #启动刚刚创建的10个线程
        t.start()
    for t in threads: #join创建的10个线程
        t.join()
    

    线程的事件Events

    线程控制其他线程的执行,线程间交互
    四个方法:

    1.set 设置标志位 true
    2.clear 清空标志位 flase
    3.等待标志位被设置
    4.is_set 是否设置了标志位
    
    
    print('-------红绿灯-------')
    event=threading.Event()
    def light():   # 设置标志位时 是绿灯
        count=0
        while True:
            if count<5:
                event.set()  # 设置标志位时
                print('\033[42;1m绿灯...%s\033[0m'%count)
            elif count>4:
                event.clear()#清空标志位
                print('\033[41;1m红灯...%s\033[0m'%count)
            time.sleep(1)
            count += 1
            if count==10:
                count=0
    
    thrad_light=threading.Thread(target=light,)
    thrad_light.start()
    
    def car():
        while True:
            if event.is_set(): #判断是否设置了标志位
                print('绿灯了,我过马路咯')
                time.sleep(1)
            else:
                print('红灯了,等着过马路') #标志位flase时,wait 可阻塞当前事件
                event.wait()
    thrad_car=threading.Thread(target=car,)
    thrad_car.start()
    

    queue 队列

    为什么要用队列?

    1. 提高效率 :任务放进队列执行就OK
    2. 完成程序之间的解耦(程序间的依赖关系):执行任务 和 队列 没有关联
    

    方法:

    put:放入队列
    get:获取队列中的数据
    get_nowait:获取为空时,抛出异常
    qsize:获取队列大小
    

    放入取出

    放入

    q=queue.Queue(maxsize=2) #生成队列 并设置最大size
    q.put(1)
    q.put('domain')
    try:
        q.put('alex', block=False) #添加时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常
    except Exception as e:
        print(e)
    print(q.qsize()) #查看当前队列 size
    

    取出

    print(q.get())
    print(q.get())
    try:
        print(q.get(block=False)) #取出时 已经占满最大size了,会阻塞卡住,需要吧block 设置成true,则不会阻塞,直接抛出异常,用get_nowait 效果一样
    except Exception as e:
        print(e)
    
    try:
        print(q.get_nowait())
    except Exception as e:
       pass
    

    出入顺序

    默认的读取顺序是先入先出也就是queue.Queue(),后入先出使用:queue.LifoQueue()

    后入先出

    print('----后入先出----')
    q=queue.LifoQueue() #生成队列
    q.put('alex')
    q.put('domain')
    print(q.get())
    print(q.get())
    

    自定义优先级

    print('----自定义优先级----')
    q=queue.PriorityQueue()
    q.put((1,'domain'))
    q.put((-1,'alex'))
    q.put((3,'searse'))
    print((q.get()))
    print((q.get()))
    print((q.get()))
    

    生产者 消费者

    某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数、线程、进程等)。产生数据的模块称为生产者,而处理数据的模块称为消费者。在生产者与消费者之间的缓冲区称之为仓库。生产者负责产生数据送完仓库,而消费者负责从仓库里取出数据处理消费,这就构成了生产者消费者模式。

    为什么要使用生产者消费者 模型

    1.解耦:两个模块间通过仓库交互,两个模块间的代码互不依赖
    2.并发:两个模块并发,生产者产生数据,仓库中有数据消费者就处理
    
    print('----生产者 消费者----')
    q=queue.Queue(maxsize=10) #生成队列 并设置最大size10
    
    def produce():
        count=0
        while True:
            q.put(count)
            print('产生%s'%count)
            count+=1
            time.sleep(0.5) #0.1秒钟 产生一个
    def consume(name):
        while True:
            data=q.get()
            time.sleep(1)  #1秒钟 消费一个
            print('%s消费了%s'%(name,data))
    
    thread_produce=threading.Thread(target=produce)
    thread_produce=threading.Thread(target=produce)
    thread_consume1=threading.Thread(target=consume,args=('domain',))
    thread_consume2=threading.Thread(target=consume,args=('alex',))
    thread_produce.start()
    thread_consume1.start()
    thread_consume2.start()
    

    进程

    介绍

    1. 语法基本和线程一样
    2. 每一个进程都是由进程启动的,都会有一个父进程

    创建进程,进程中创建线程

    def info():
        print('parent process id:',os.getppid())
        print('current process id:',os.getpid())
    
    def run():
        threads = [threading.Thread(target=info,) for i in range(2)]  # 用列表生成式 生成10个线程
        for t in threads:  # 启动刚刚创建的2个线程
            t.start()      #在当前进程 启动两个线程
            time.sleep(3)
    
    if __name__ == '__main__':
        info()
        processes=[multiprocessing.Process(target=run,) for i in range(1)] #用列表生成式 生成10个线程
        for p in processes:
            p.start()#启动刚刚创建的10个进程
            p.join()#进程结束 主进程也就是当前的程序 才结束
        print('master process finished...')
    

    进程间的通信

    创建子进程时,克隆了一份父进程,子进程更改数据后,在通过中间件返回父进程,实现通信

    使用multiprocessing.Queue 传递数据

    from multiprocessing import Queue,Process,Pipe #进程间队列通信 要import 这个Queue
    
    def fun(arg):
         arg.put('domain')
         arg.put('alex')
    
     if __name__ == '__main__':
         q=Queue()
         p=Process(target=fun,args=(q,))
        p.start()
         print(q.get())
    
    

    使用multiprocessing.Pipe 传递数据

    from multiprocessing import  Process,Pipe
    print('---Pipe---')
    
    def fun(child):
        child.send(['domain','alex'])
        child.send(['12','33'])
        print(child.recv())
        child.close()
    
    if __name__ == '__main__':
        parent,child=Pipe()
        p=Process(target=fun,args=(child,))
        p.start()
        print(parent.recv())
        print(parent.recv())
        parent.send("from parent:hello")
        p.join() # 需写在send 之后
    

    使用Managers 传递数据

    from multiprocessing import  Process,Manager
    import  os
    def run(list,dic):
        list.append(os.getpid())  #给共享的list添加元素
        dic[os.getpid()] = os.getppid()  #给共享的dict添加元素
    
    if __name__ == '__main__':
        with Manager() as manager:
            list=manager.list() #生成一个用于共享的list
            dic=manager.dict()  #生成一个用于共享的dict
            processes = [Process(target=run,args=(list,dic)) for i in range(10)]  # 用列表生成式 生成10个线程
            for p in processes:
                p.start()  # 启动刚刚创建的10个进程
                p.join()  # 进程结束 主进程也就是当前的程序 才结束
            print(list)
            print(dic)
    

    进程中的锁

    进程中不能共享数据,那么这个锁存在的意义的是什么?虽然数据没有共享,但是在打印时共享同一块屏幕,加上锁 确保打印不乱

    from multiprocessing import  Process,Lock
    import  os
    def run(lock,num):
        lock.acquire()
        print(num)
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        processes = [Process(target=run, args=(lock, i)) for i in range(10)]  # 用列表生成式 生成10个线程
        for p in processes:
            p.start()  # 启动刚刚创建的10个进程
            p.join()  # 进程结束 主进程也就是当前的程序 才结束
    

    进程池

    设置固定数量的进程同时执行
    python线程,因为线程池占用资源很小,所有没有线程池。但是进程不一样,前面说过 创建一个进程 要拷贝一份父进程,所以占用资源极其大,需要使用进程池。

    from multiprocessing import Pool
    import os,time
    def run(arg):
        print('子进程:',os.getpid())
        time.sleep(2)
        return os.getpid() # 返回参数给回调方法end
    def end(arg):
        print(arg,'的回调','---',)
    if __name__ == '__main__':
        print('主进程id:',os.getpid())
        pool=Pool(5)
        for i in range(20):
            # pool.apply(fun,) #串行执行
            if i%2==0:
             pool.apply_async(func=run,args=(i,),callback=end) #并行执行,callback,是进程结束后的回调,是主进程调用的回调。
        pool.close() #需先close,再join
        pool.join()  #join: 等待子进程,主线程再结束
        print('main finished...'
    

    协程

    介绍

    1. 用户态(存在用户空间的数据)的轻量级线程(微线程)
    2. 协程能保留上一次状态
    3. 在单线程实现并发,串行操作,函数间来回切换执行
      4.遇到IO操作就切换,IO 操作完了再回来

    通过greenlet 使用协程(手动切换)

    from greenlet import greenlet
    def func1():
        print(1)
        g2.switch() # 输出1后 后切换到协程2执行
        print(2)    # 输出1后 后切换到协程2执行
        g2.switch()
    def func2():
        print(3)
        g1.switch() # 输出3后 后切换到协程1执行
        print(4)
    g1=greenlet(func1) #启动协程1 传入要执行的方法
    g2=greenlet(func2) #启动协程2 传入要执行的方法
    g1.switch()        #切换到协程1,启动协程1,执行func1方法
    

    通过gevent使用协程(自动切换)

    import gevent,time
    def func1():
        print('in the func1 1')
        gevent.sleep(2)  #模拟io 操作需要2s,真实io 操作不需要 写这句,模块会自动判断
        print('in the func1 2')   # 因为这里io 需要时间最久,所以最后才会执行这句,先跳去执行其他的方法了。最后回来执行。也就是整个协程执行的时间 :2s
    def func2():
        print('in the func2 1')
        gevent.sleep(1)
        print('in the func2 2')
    def func3():
        print('in the func3 1')
        gevent.sleep(1)
        print('in the func3 2')
    start_time=time.time()
    gevent.joinall([          # 加入所有协程
        gevent.spawn(func1),
        gevent.spawn(func2),
        gevent.spawn(func3),
    ])
    end_time=time.time()
    expend_time=end_time-start_time
    print(expend_time)
    
    
    

    协程爬取网页

    from urllib import request
    import gevent,time
    from gevent import  monkey
    
    monkey.patch_all()
    def spider(url,save_name):
         respones=request.urlopen(url)
         data=respones.read()
         with open(save_name,'wb') as f:
             f.write(data)
    start_time=time.time()
    gevent.joinall([          # 加入所有协程
        gevent.spawn(spider, 'https://www.baidu.com','baidu.html'),
        gevent.spawn(spider,'https://www.iqiyi.com','iqiyi.html'),
        gevent.spawn(spider,'https://weibo.com/','weibo.html')
    ])
    end_time=time.time()
    expend_time=end_time-start_time
    print('花费时间:',expend_time)
    

    协程并发socket

    服务端

    import sys
    import socket
    import time
    import gevent
    
    from gevent import socket, monkey
    monkey.patch_all()
    def server(port):
        s = socket.socket()
        s.bind(('0.0.0.0', port))
        s.listen(500)
        while True:
            cli, addr = s.accept()
            gevent.spawn(handle_request, cli)
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv:", data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
        except Exception as  e:
            print(e)
        finally:
            conn.close()
    if __name__ == '__main__':
        server(8001)
    
    

    客户端

    import socket
    
    HOST = 'localhost'  # The remote host
    PORT = 8001  # The same port as used by the server
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((HOST, PORT))
    while True:
        msg = bytes(input(">>:"), encoding="utf8")
        s.sendall(msg)
        data = s.recv(1024)
        # print(data)
    
        print('Received', repr(data))
    s.close()
    

    100个线程并发

    
    import socket
    import threading
    def sock_conn():
        client = socket.socket()
        client.connect(("localhost",8001))
        count = 0
        while True:
            client.send( ("hello %s" %count).encode("utf-8"))
            data = client.recv(1024)
            print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
            count +=1
        client.close()
    
    
    for i in range(100):
        t = threading.Thread(target=sock_conn)
        t.start()
    

    相关文章

      网友评论

          本文标题:Python 中的进程、线程、协程详解

          本文链接:https://www.haomeiwen.com/subject/iatqdftx.html