美文网首页
Python协程

Python协程

作者: 开心的小哈 | 来源:发表于2022-07-02 22:10 被阅读0次

    1,协程



    概念:协程不是计算机提供的,而是程序员人文创造
    协程(Coroutine),也可以被称为微线程,是一种用户态的上下文切换技术,简而言之,其实就是通过一个线程实现代码块相互切换执行如:
    def func1():
        print(1)
        #该方法遇到阻塞可以切换到函数2中进行使用
        print(2)
    def func2():
        print(3)
    
        print(4)
    
    func1()
    func2()
    

    实现协程有如下几种方法:

    • greenlet,早期模块
    • yield关键字.
    • asyncio 装饰器(py3.4)
    • async , await 关键字(py3.5)[推荐]

    1. 1greenlet 实现协程

    pip install greenlet

    from greenlet import greenlet
    def func1():
        print(1) # 第1步 输出1
        #该方法遇到阻塞可以切换到函数2中进行使用
        gr2.switch() # 第2步:切换到func2中 并执行
        print(2) # 第五步 输出2
        gr2.switch() #第六步 切换 func2
    def func2():
        print(3) #第三步:输出3
        gr1.switch() # 第四步:切换回func1 并执行
        print(4) # 第七步:输出4
    
    
    gr1=greenlet(func1)
    gr2=greenlet(func2)
    
    gr1.switch() # 第0步,切换func1并执行
    

    1.2 yield关键字

    def func1():
        yield 1
        yield from func2()
        yield 2
    def func2():
        yield 3
        yield 4
    
    f1=func1()
    for item in f1:
        print(item)
    

    1.3asyncio(在python3.4之后的版本)

    import asyncio
    import time
    @asyncio.coroutine
    def func1():
        print(1)
        yield  from asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
        print(2)
    
    @asyncio.coroutine
    def func2():
        print(3)
        yield  from asyncio.sleep(2)
        print(4)
    
    @asyncio.coroutine
    def func3():
        print(5)
        yield  from asyncio.sleep(2)
        print(6)
    
    tasks=[
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() ),
        asyncio.ensure_future( func3() )
    ]
    
    # 协程函数使用 func1()这种方式是执行不了的
    start=time.time()
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # loop.run_until_complete(func1()) 执行一个函数
    end=time.time()
    print(end-start) # 只会等待3秒
    

    注意:遇到IO等耗时操作会自动切换

    1.4asyc&await 关键字

    在python3.5及以后的版本.

    import asyncio
    import time
    
    async def func1():
        print(1)
        await asyncio.sleep(3) # 遇到耗时后会自动切换到其他函数中执行
        print(2)
    
    
    async def func2():
        print(3)
        await asyncio.sleep(2)
        print(4)
    
    
    async def func3():
        print(5)
        await asyncio.sleep(2)
        print(6)
    
    tasks=[
        asyncio.ensure_future( func1() ),
        asyncio.ensure_future( func2() ),
        asyncio.ensure_future( func3() )
    ]
    
    # 协程函数使用 func1()这种方式是执行不了的
    start=time.time()
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # loop.run_until_complete(func1()) 执行一个函数
    end=time.time()
    print(end-start) # 只会等待3秒
    

    协程意义



    在一个线程中如果遇到IO等待时间,线程不会傻傻等,利用空闲的时候再去干点其他事情;
    案例:去下载三张图片(网络IO)
    • 普通方式
    import requests
    import time
    def downlod_image(url):
        print('开始下载')
        res=requests.get(url).content
        print('下载完成')
        # 图片保存到本地文件
        fileName=url.rsplit('-')[-1]
        with open(fileName,'wb') as wf:
            wf.write(res)
    
    url_list=[
        'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
        'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
        'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
    ]
    
    if __name__=='__main__':
        start=time.time()
        for item in url_list:
            downlod_image(item)
        end=time.time()
        print('本次耗时',end-start) # 本次耗时 1.5057339668273926
    
    • 协程方式(异步)
    import asyncio
    
    import requests
    import time
    import aiohttp
    async def downlod_image(session,url):
        print('开始下载')
        async with session.get(url,verify_ssl=False) as res:
            content=await res.content.read()
            print('下载完成')
            # 图片保存到本地文件
            fileName=url.rsplit('-')[-1]
            with open(fileName,'wb') as wf:
                wf.write(content)
    
    url_list=[
        'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
        'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
        'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
    ]
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks=[asyncio.create_task(downlod_image(session,url)) for url in url_list]
            await asyncio.wait(tasks)
    if __name__=='__main__':
        start=time.time()
        asyncio.run(main())
        end=time.time()
        print("本次耗时",end-start) # 本次耗时 0.3856923580169678
    

    3. 异步编程


    3.1 事件循环

    理解成一个死循环,去检测并执行某些代码.

    任务列表=[任务1,任务2,任务3 ..... ]
    while True:
      可执行的人物列表,已完成的任务列表=去任务列表中检查所有任务,将'可执行'和'已完成'的任务返回
      for 就绪任务 in 可执行任务列表:
        执行已就绪的任务
      for 已完成的任务 in 已完成的任务列表
        任务列表中移除 已完成
    如果 任务列表 中的任务都已完成, 终止循环
    
    import asyncio
    # 去省城一个获取事件循环
    loop=asyncio.get_event_loop()
    # 将任务放到'任务列表'
    loop.run_until_complete(任务列表)
    
    

    3.2 快速上手

    协程函数:定义函数时候 async def 函数名 称为协程函数
    协程对象:执行 协程函数() 得到线程对象.

    async def func(): # 协程方法
      pass
    
    result=func() # 协程对象,内部代码不会执行的
    

    如果想要运行携程函数内部代码,必须要将协程对象交给事件循环来处理

    import asyncio
    async def func(): # 协程方法
      print('函数哈')
    
    result=func() # 协程对象,内部代码不会执行的
    
    # 去省城一个获取事件循环
    # loop=asyncio.get_event_loop()
    # 将任务放到'任务列表'
    # loop.run_until_complete(result)
    # python3.7可以是使用
    asyncio.run(result)
    

    3.3 await

    await+可等待的对象(协程对象,Future,Task对象) ~=IO等待
    示例1:

    import asyncio
    async def func():
      print('1111111')
      res=await asyncio.sleep(2)# 假设发送一个网络请求
      print('end',res)
    
    asyncio.run(func())
    

    示例2:

    import asyncio
    async def others():
      print('start')
      await asyncio.sleep(2)
      print('end')
      return  '返回值'
    
    async def func():
      print('执行协程函数内部代码')
      respo=await others()
      print('IO请求结束结果为',respo)
    
    asyncio.run(func())
    
    
    

    示例3

    import asyncio
    async def others():
      print('start')
      await asyncio.sleep(2)
      print('end')
      return  '返回值'
    
    async def func():
      print('执行协程函数内部代码')
      respo1=await others()
      print('IO请求结束结果为1',respo1)
      respo2=await others()
      print('IO请求结束结果为2',respo2)
    
    asyncio.run(func())
    
    执行协程函数内部代码
    start
    end
    IO请求结束结果为 返回值
    start
    end
    IO请求结束结果为 返回值
    
    

    await 就是等待对应的值得到结果后再继续向下走

    3.4 Task对象

    在事件循环中添加多个任务的.
    Task用于并发调度协程,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行.除了使用asyncio.create_task()函数以外,还可以用底层级的loop.create_task()或asyncio.ensure_future()函数.不建议手动实例化Task对象.
    注意:asyncio.create_task()函数在python3.7中被加入,在python3.7之前可以改用底层级的asyncio.ensure_future()函数
    示例1:

    import asyncio
    
    async def func():
      print(1)
      await asyncio.sleep(2)
      print(2)
      return 'func111'
    
    async def main():
      print('start main')
      # 创建task对象,将当前执行func函数任务添加到事件循环中.
      task1=asyncio.create_task(func())
      task2=asyncio.create_task(func())
      print('main end')
      ret1=await task1
      ret2=await task2
      print(ret1,ret2)
    
    asyncio.run(main())
    

    示例2:

    import asyncio
    
    async def func():
      print(1)
      await asyncio.sleep(2)
      print(2)
      return 'func111'
    
    async def main():
      print('start main')
      # 创建task对象,将当前执行func函数任务添加到事件循环中.
      task_list=[
        asyncio.create_task(func(),name='n1'),
        asyncio.create_task(func(),name='n2')
      ]
      print('main end')
      don,pending=await asyncio.wait(task_list,timeout=None) # 最多等待1秒,如果任务没有完成,那么返回结果就是空的
      print(don)
    
    
    asyncio.run(main())
    

    示例3

    import asyncio
    
    async def func():
      print(1)
      await asyncio.sleep(2)
      print(2)
      return 'func111'
    
    
    
    # 创建task对象,将当前执行func函数任务添加到事件循环中.
    task_list=[
        func(),
       func(),
      ]
    
    
    
    done,paen=asyncio.run(asyncio.wait(task_list)) # 该函数run内部会创建事件循环loop
    

    3.5 asyncio.Future对象

    Task继承Future,Task对象内部await结果的处理基于Future对象来的
    示例1:

    import asyncio
    async def main():
        # 获取当前事件循环
        loop=asyncio.get_running_loop()
        # 创建一个任务(Future对象),这个任务什么也没有干
        fut=loop.create_future()
        # 等待任务最终结果(Future对象),没有结果则会一直等待下去
        await fut
    asyncio.run(main())
    

    示例2:

    import asyncio
    async def set_after(fut):
        await asyncio.sleep(2)
        fut.set_result('666')
    
    async def main():
        loop=asyncio.get_running_loop()
        fut=loop.create_future()
        await loop.create_task(set_after(fut))
        data=await fut
        print(data)
    
    asyncio.run(main())
    

    Task和Future的区别:如果Future没有设置返回值那么await就会一直等待,Task函数执行完成后默认会调用set_result('666')来结束

    3.5concurrent.futures.Future对象

    使用线程池,进程池实现异步操作时用到的对象.
    示例1:

    import time
    from concurrent.futures import Future
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor
    
    def func(value):
        time.sleep(2)
        print(value)
        return 123
    
    pool =ThreadPoolExecutor(max_workers=5)
    for i in range(10):
        fut = pool.submit(func,i)
        print(fut)
    

    以后写代码可能会存在交叉使用.例如:CRM项目80%都是给予协程异步编程+MySQL(不支持)[线程,进程做异步编程]

    import time
    import asyncio
    import concurrent.futures
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return 'sd'
    
    async def main():
        loop=asyncio.get_running_loop()
        # 第一步内部会先调用ThreadPoolExecutor的submit方法取线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
        # 第二步调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象.
        # 应为concurrent.futures.Future对象不支持await语法,所以需要包装为asycio.Future对象才能使用.
        fut=loop.run_in_executor(None,func1)
        result=await fut
        print('default thread pool',result)
    
    asyncio.run(main())
    

    示例2 案例:asyncio+不支持异步的模块

    # 案例:asyncio+不支持异步的模块
    import asyncio
    import requests
    import time
    import aiohttp
    async def downlod_image(url):
        print('开始下载')
        loop=asyncio.get_event_loop()
        # 开启线程进行请求后转换成协程进行等待
        future=loop.run_in_executor(None,requests.get,url)
        res=await future
        print('下载完成')
        # 图片保存到本地文件
        fileName=url.rsplit('-')[-1]
        with open(fileName,'wb') as wf:
            wf.write(res.content)
    
    url_list=[
        'http://pic.netbian.com/uploads/allimg/220629/224839-1656514119b359.jpg',
        'http://pic.netbian.com/uploads/allimg/220420/114427-16504262671afd.jpg',
        'http://pic.netbian.com/uploads/allimg/220623/234915-16559993552953.jpg'
    ]
    def main():
        tasks=[downlod_image(url) for url in url_list]
        loop=asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    
    if __name__=='__main__':
        start=time.time()
        main()
        end=time.time()
        print("本次耗时",end-start) # 本次耗时 0.4465653896331787
    

    3.6 异步迭代器

    什么是异步迭代器
    实现了aiter()和__anext()方法的对象。anext必须返回一个awaitable对象。async__for会处理异步迭代器的anext()方法所返回的可等待对象。知道引发一个StopAsyncIteration异常。由PEP492引入
    什么是异步迭代对象?
    可在async_for语句中呗使用的对象。必须通过他的aiter()方法返回一个asynchronous iterator由PEP492引入

    import asyncio
    class Reader(object):
        """自定义的异步迭代对象(同时也是异步可迭代对象)"""
        def __init__(self):
            self.count=0
    
        async def readline(self):
            # await asyncio.sleep(2)
            self.count+=1
            if self.count==100:
                return None
            return self.count
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            val=await self.readline()
            if val == None:
                raise StopAsyncIteration
            return val
    
    async def func():
        obj=Reader()
        async for item in obj:
            print(item)
    asyncio.run(func())
    
    

    3.8异步的上下文管理器

    此种对象通过定义aenter()和aexit()方法来对async with语句中的环境进行控制。

    import asyncio
    class AsyncContextManger(object):
        def __init__(self):
            self.conn=0
    
        async def do_something(self):
            # 异步操作数据库
            return 666
        async def __aenter__(self):
            self.conn=await asyncio.sleep(1)
            # 异步连接数据库
            return self
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # 异步关闭数据库
            await asyncio.sleep(1)
    
    # obj=AsyncContextManger()
    # async with AsyncContextManger() as f:
    #     rel=  f.do_something()
    #     print(rel)
    #     pass
    #  不可以直接运行,必须嵌套async函数中
    async def func():
        async with AsyncContextManger() as f:
            rel=f.do_something()
            print(rel)
    asyncio.run(func())
    
    
    

    4.uvloop



    是asyncio的事件循环的代替方案,事件循环>默认asyncio的事件循环。
    pip install uvloop
    使用
    import uvloop
    import asyncio
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    # 编写asyncio的代码 与之前一样
    # 内部的事件循环自动化会变为uvloop
    asyncio.run(...)
    

    注意事项:一个asgi->uvicorn 内部使用的就是uvloop

    协程的案例请查看:

    5. 实战案例


    5.1异步redis

    在使用python代码操作redis时,链接/操作/断开都是网络IO。
    pip install aioredis
    使用

    import aioredis
    import asyncio
    async def execute(address,password):
        print('start-',address)
        # 网络IO操作:先去链接47.93.4.189:6666,遇到IO自动切换任务,去连接47.93.4.198.2712
        redis=await aioredis.create_redis_pool(address,password=password)
        # 网络IO操作:遇到IO会自动切换任务
        await redis.hmset_dict('car',key=1,key2=2,key3=3)
        # 网络IO操作:遇到IO会自动切换任务
        result=await redis.hgetall('car',encoding='utf-8')
        print(result)
        redis.close()
        # 网络IO操作:遇到IO会自动切换任务
        await redis.wait_closed()
        print('end-',address)
    task_list=[
        execute('redis://47.93.4.189:6666','123456'),
        execute('redis://47.93.4.198.2712','123456')
    ]
    
    asyncio.run(asyncio.wait(task_list))
    

    5.2 异步MySQL

    pip install aiomysql
    示例1:

    import asyncio
    import aiomysql
    async def execute():
        # 网络IO操作:链接mysql
        conn=await aiomysql.connect(host='127.0.0.1',port=3306,user='root',password='123',db='mysql')
        # 网络IO操作,创建CURSOR
        cur=await conn.cursor()
        # 网络IO操作,执行SQL
        await cur.execute('select * from user')
        # 网络IO操作,执行SQL
        result=await cur.fetchall()
        print(result)
        # 网络IO操作,执行SQL
        await cur.close()
        conn.close()
    
    asyncio.run(execute())
    

    示例2

    import asyncio
    import aiomysql
    async def execute(host,password):
        # 网络IO操作:链接mysql
        conn=await aiomysql.connect(host=host,port=3306,user='root',password=password,db='mysql')
        # 网络IO操作,创建CURSOR
        cur=await conn.cursor()
        # 网络IO操作,执行SQL
        await cur.execute('select * from user')
        # 网络IO操作,执行SQL
        result=await cur.fetchall()
        print(result)
        # 网络IO操作,执行SQL
        await cur.close()
        conn.close()
    task_list=[
        execute('127.0.0.1','123456'),
    execute('192.168.3.112','1a23456')
    ]
    
    asyncio.run(asyncio.wait(task_list))
    

    5.3FastAPI框架

    pip install fastapi
    pip install uvicornasgi内部基于uvloop

    相关文章

      网友评论

          本文标题:Python协程

          本文链接:https://www.haomeiwen.com/subject/zvfkbrtx.html