美文网首页
Python 基于协程的异步 IO

Python 基于协程的异步 IO

作者: stefanlei | 来源:发表于2018-11-27 10:45 被阅读0次

    转载请注明出处


    异步 IO

    参考链接
    我们使用的是 Python 3 内置的模块 asyncio


    概念说明

    • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

    • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
      例如:

      # 这样就是注册了一个协程
      async def test1():
          pass
          return 'ok'
      
      coroutine = test1()
      
    • task 任务:一个协程对象就是一个原生可以挂起的函数,task 则是对协程进一步封装,其中包含了任务的各种状态

    • future: 代表将来执行或没有执行的任务的结果。它和 task 上没有本质上的区别

    • async/await 关键字:Python3.5 用于定义协程的关键字,async 定义一个协程,await 用于挂起阻塞的异步调用接口。


    定义一个简单的协程

    import asyncio
    
    # 这里定义了一个协程函数
    async def test():
        pass
        return 'ok'
    
    # 这里是提前写好的回调函数
    def test(future):
        print(future)
    
    
    # 这里是返回了一个协程对象
    coroutine = test()
    
    # 把协程对象封装成 task 
    task = asyncio.ensure_future(coroutine)
    
    # 创建消息循环
    loop = asyncio.get_event_loop()
    
    # 设置回调函数,我们通过回调函数,当任务执行完毕,就会把结果传到回调函数中
    task.add_done_callback(test)
    
    # 把 task 加入到消息队列中去运行,这一步一定要到最后
    # 这是一个阻塞式的操作
    loop.run_until_complete(task)
    

    await 关键字

    await就类似于 yield from

    使用 async 可以定义协程对象,使用 await 可以针对耗时的操作进行挂起,就像生成器里的 yield 一样,函数让出控制权。协程遇到 await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行 。

    这里我们用内置的 asyncio.sleep(1) ,模拟网络请求

    import asyncio
    
    
    async def test():
        # 这里就是使用 await 关键字,遇到这个,事件循环就会挂起该协程,然后执行其他协程。
        # 直到其他的协程也挂起或者执行完毕,才继续执行下面的代码,异步执行的,就是说总时间不是叠加,而是算最长的
        await asyncio.sleep(1)
        return 'ok'
    
    
    def a(future):
        print(future.result())
    
    
    coroutine = test()
    task = asyncio.ensure_future(coroutine)
    loop = asyncio.get_event_loop()
    task.add_done_callback(a)
    loop.run_until_complete(task)
    
    

    并发操作(多个协程)

    import asyncio
    import time
    
    start = time.clock()
    
    
    async def test(a):
        await asyncio.sleep(a)
        return 'ok'
    
    
    coroutine1 = test(1)
    coroutine2 = test(1)
    coroutine3 = test(1)
    coroutine4 = test(1)
    
    # 这里就是多个任务。
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
        asyncio.ensure_future(coroutine4),
    ]
    
    loop = asyncio.get_event_loop()
    # loop.run_until_complete(asyncio.gather(*tasks)) 
    # 两个方式都可以,实现并发的时候,要把协程像下面一样并列的执行,
    # 如果有协程嵌套,那么子协程如果要并发,可以使用 await asyncio.gather(*tasks)
    results = loop.run_until_complete(asyncio.wait(tasks))
    end = time.clock()
    print(end - start)
    
    # 可以通过这个来获取每一个,协程的返回值,Python 官方不推荐这样做,不推荐使用返回值,而是直接在协程中完成任务
    for item in results[0]:
        print(item.result())
    
    

    run_until_complete 和 run_forever

    我们一直通过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

    import asyncio
    
    
    async def do_some(n):
        print('等待1秒')
        await asyncio.sleep(n)
        print('完成')
        return 'ok'
    
    
    loop = asyncio.get_event_loop()
    
    coroutine = do_some(1)
    
    task = asyncio.ensure_future(coroutine)
    
    # 等所有的 task 都执行完了,这个函数就会结束,程序退出。
    res = loop.run_until_complete(task)
    print(res.result())
    

    现在改用 run_forever

    import asyncio
    
    async def do_some(n):
        print('等待1秒')
        await asyncio.sleep(1)
        print('完成')
    
    
    coro = do_some(1)
    asyncio.ensure_future(coro)
    loop = asyncio.get_event_loop()
    # 一秒后结果输出,但是,线程一直被阻塞,程序没有退出。
    loop.run_forever()
    
    

    使用run_forever(),future 结束,但是程序并不会退出。run_forever会一直运行,知道 loop.stop()被调用,但是你不能像下面这样调用

    loop.run_forever()
    loop.stop()
    

    run_forever()不返回,stop永远也不会被调用,所以我们只能在协程中调用 stop,如下

    async def do_some(loop):
        print('Waiting 3 second')
        await asyncio.sleep(3)
        print('Done')
        loop.stop()
    

    这样并非没有问题,假设有多个协程在 loop 里面运行

    import asyncio
    
    async def do_some(loop, n):
        print('等待{}秒'.format(n))
        await asyncio.sleep(n)
        print('{} 完成'.format(n))
        
        # 第二个协程还没有结束,loop 就停止了,被先完成的那个协程给 stop 
        loop.stop()
    
    
    loop = asyncio.get_event_loop()
    coro1 = do_some(loop, 1)
    coro2 = do_some(loop, 2)
    asyncio.ensure_future(coro1)
    asyncio.ensure_future(coro2)
    
    loop.run_forever()
    

    要解决这个问题,那我们就要使用 gather把多个协程合并为一个 future ,并添加回调。然后在回调中停止。

    asyncio.gather 和 asyncio.ensure_future 可以把新的任务加入到事件循环当中

    import asyncio
    import functools
    
    
    async def do_some(n):
        print('等待{}秒'.format(n))
        await asyncio.sleep(n)
        print('{} 完成'.format(n))
    
    # 这个 future 是我们执行后的结果集
    def done_callback(loop, future):
        loop.stop()
        pass
    
    
    loop = asyncio.get_event_loop()
    futures = asyncio.gather(do_some(1), do_some(3))
    # 这里式使用了偏函数,当然我们也可以使用闭包
    futures.add_done_callback(functools.partial(done_callback, loop))
    loop.run_forever()
    
    --------------------------------------------
    # 使用闭包
    
    def done_callback(loop):
        def inner(future):
            loop.stop()
        return inner
    
    loop = asyncio.get_event_loop()
    futures = asyncio.gather(do_some(1), do_some(3))
    futures.add_done_callback(done_callback(loop))
    loop.run_forever()
    ---------------------------------------------
    
    

    这样就可以啦,其实我们这里就是手动的实现了 run_until_complete,而且 run_until_complete内部也是调用了run_forever


    Close Loop ?

    以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢? 简单来说,loop 只要不关闭,就还可以再运行。:

    loop.run_until_complete(do_some_work(loop, 1))
    loop.run_until_complete(do_some_work(loop, 3))
    loop.close()
    

    但是如果关闭了,就不能再运行了:

    loop.run_until_complete(do_some_work(loop, 1))
    loop.close()
    # 此处异常
    loop.run_until_complete(do_some_work(loop, 3))  
    
    建议调用 loop.close,以彻底清理 loop 对象防止误用。
    

    生产者消费者

    import asyncio
    from asyncio import Queue
    
    
    async def producer(queue):
        print('开始生成数据')
        for i in range(100):
            await queue.put(i)
            print('已经生产数据 {}'.format(i))
    
            if queue.qsize() >= 10:
                await queue.join()
    
    
    async def comsumer(queue):
        print('开始消费数据')
        while True:
            item = await queue.get()
            print('消费了数据 {}'.format(item))
            queue.task_done()
    
    loop = asyncio.get_event_loop()
    queue = Queue()
    loop.run_until_complete(asyncio.gather(producer(queue), comsumer(queue)))
    
    

    协程 API

    loop = asyncio.get_event_loop()  # 获取事件循环
    loop = run_until_complete()  # 开启循环
    asyncio.ensure_future()  # 把协程加入到循环,直到调用 loop.run_forever() 才会执行
    asyncio.gather()  # 把更多的协程加入到循环
    asyncio.Semaphore(5) # 限制同时运行的协程数量
    

    Semaphore 例子

    import asyncio
    import aiohttp
    
    
    # Semaphore 只要放在协程里面就可以,用 async with 修饰,windows 限制了 Semap 的数量,Linux 没有限制。
    
    class AsyncRequest(object):
        sem = asyncio.Semaphore(500)
        @classmethod
        async def request(cls, url=None, method='GET', cookies=None, params=None, data=None, headers=None, encode='utf8'):
            async with cls.sem:
                async with aiohttp.ClientSession(cookies=cookies) as session:
                    if method == 'GET':
                        async with session.get(url, params=params, headers=headers) as response:
                            return {
                                'text': await response.text(encoding=encode),
                                'url': response.url,
                                'headers': response.headers,
                                'read': await response.read(),
                                'status_code': response.status,
                                'cookies': response.cookies
                            }
                    elif method == 'POST':
                        async with session.post(url, data=data, headers=headers) as response:
                            return {
                                'text': await response.text(encoding=encode),
                                'url': response.url,
                                'headers': response.headers,
                                'read': await response.read(),
                                'status_code': response.status,
                                'cookies': response.cookies
                            }
    
    

    相关文章

      网友评论

          本文标题:Python 基于协程的异步 IO

          本文链接:https://www.haomeiwen.com/subject/deawqqtx.html