美文网首页
asyncio并发编程-上

asyncio并发编程-上

作者: 码农小杨 | 来源:发表于2020-02-28 18:10 被阅读0次

    asyncioPython中解决异步I/O高并发的一个模块。

    asyncio的事件循环

    我们先看下asyncio有哪些功能:

    1. 包含各种特定系统实现的模块化事件循环(针对不同系统都能兼容的事件循环:例如Windows下的select,linux下的epoll。)

    2. 传输和协议抽象(对TCP和UDP协议的抽象)

    3. 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持

    4. 模仿futures模块但适用于事件循环使用的Future

    5. 基于yield from的协议和任务,可以让我们使用顺序的方式编写并发代码

    6. 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件迁移到线程池

    7. 模仿threading模块中的同步原语,可以用在单线程内的协程之间

    前面我们学习了协程,但是协程脱离事件循环意义就不是很大了。

    下面我们开始学习asyncio的使用吧!💪

    首先我们明确一点,高并发异步IO编程的编码模式由三部分组成:

    事件循环+回调(驱动生成器)+epoll(IO多路复用)

    asyncioPython用于解决异步io编程的一整套解决方案

    有趣的小知识:

    tornado也是基于asyncio的异步框架,通过协程和事件循环来完成高并发。相对于DjangoFlask这种传统的阻塞IO框架本身不提供web服务器,不会去完成Socket编码的,因此我们在部署的时候会搭配实现了SOcket编码的框架(uwsgi, gunicorn+nginx)。Tornado实现了自己的web服务器,因此我们部署Tornado的时候是可以直接部署的(会使用epoll来完成socket请求),但是真正部署的时候,还是会使用nginx来完成一些操作(IP限制等)。因此Tornado的数据库驱动就不能使用阻塞IO驱动框架了。

    asyncio的简单使用:

    协程要搭配事件循环才能使用

    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
    
    if __name__ == "__main__":
        start_time = time.time()
        
        # 我们使用 asyncio 实现的事件循环 这个loop就可完成 之前我们自己实现的 事件循环 select 的操作
        loop = asyncio.get_event_loop()
        
        # 可以使用 run_until_complete 进行协程的调用 这是一个阻塞函数 可以理解为多线程编程中的jion方法 然后把 asyncio理解为协程池
        loop.run_until_complete(get_html("http://www.imooc.com"))
        print(time.time()-start_time)
        
    # 输出
    start get url
    end get url
    2.0019102096557617
    

    我们可以同时执行多个协程,传入一个可迭代的任务对象

    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        print("end get url")
    
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        
        # 这个 tasks 可以是不同的协程
        tasks = [get_html("http://www.imooc.com") for i in range(10)]
        
        # asyncio.wait()函数会接收一个可迭代对象
        loop.run_until_complete(asyncio.wait(tasks))
        print(time.time()-start_time)
    
    # 输出就不打印了 耗时大概两秒
    

    注意:在协程中不能使用同步的时间睡眠 time.sleep(),否则当执行的协程超过一个的时候就会出现同步阻塞的情况。

    要是哪个小伙伴想测试下上面那句话,可以将上面的代码await asyncio.sleep(2)改为time.sleep(2)你会发现运行的时间不再是两秒了,而是20+秒。

    为什么不能再协程使用同步的sleep呢?

    这就要说到我们的loop小朋友了,协程要配合事件循环的,我们在运行协程的时候当遇到await关键字就知道这是一个异步阻塞操作了,会在此处暂停返回一个Future对象,然后由loop小朋友再执行已经可以运行的协程。这样保证了能够异步执行操作。当我们直接在协程中使用sleep同步操作时候,不会暂停而是一直等待,这就是原因😊

    如何获得协程的返回值呢?
    import asyncio
    import time
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        return "红烧肉"
    
    
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        
        # 这里使用 asyncio.ensure_future 来获得一个future对象 是不是很像多线程编程中的 submit
        get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
        
            # 也可以使用 loop 的 create_task 两者用法一样
        # task = loop.create_task()
        # task 是 future 的子类
        
        # 可以将future 对象传入到 run_until_complete
        loop.run_until_complete(get_future)
        # 通过 future 对象的 result函数获得结果
        print(get_future.result())
        
    # 输出
    start get url
    红烧肉
    

    上面的代码还可以这么写

    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
        task = loop.create_task(get_html("http://www.imooc.com"))
        loop.run_until_complete(task)
        print(task.result())
    

    我们看到使用loop.create_taskasyncio.ensure_future是一样的效果,具体区别我们稍后会学习到。💪

    有没有小伙伴怀疑,当使用asyncio.ensure_future的时候是何时和我们创建的loop建立联系的呢,是在loop.run_until_complete(get_future)的时候吗?

    让我们看下ensure_future的源码:

    def ensure_future(coro_or_future, *, loop=None):
        """Wrap a coroutine or an awaitable in a future.
    
        If the argument is a Future, it is returned directly.
        """
        if futures.isfuture(coro_or_future):
            if loop is not None and loop is not coro_or_future._loop:
                raise ValueError('loop argument must agree with Future')
            return coro_or_future
        elif coroutines.iscoroutine(coro_or_future):
            # 看这里 当没有loop传入的时候,会获得当前loop 因为线程中只有这个一个 loop 这里启动loop和外层代码的loop是同一个loop
            if loop is None:
                loop = events.get_event_loop()
                
            # 我们看到 内部同样是使用 create_task
            task = loop.create_task(coro_or_future)
            if task._source_traceback:
                del task._source_traceback[-1]
            return task
        elif compat.PY35 and inspect.isawaitable(coro_or_future):
            return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
        else:
            raise TypeError('A Future, a coroutine or an awaitable is required')
    
    

    除了上面直接调用协程,我们还可以在协程执行完成之后进行一个回调。

    import asyncio
    import time
    from functools import partial
    
    
    async def get_html(url):
        print("start get url")
        await asyncio.sleep(2)
        return "红烧肉"
    
    
    # 当我们想要在 回调函数中传递参数的时候  注意 future 参数写在最后
    def callback(url, future):
        print(url)
        print("send email to 红烧肉")
    
    
    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
        task = loop.create_task(get_html("http://www.imooc.com"))
        
        task.add_done_callback(partial(callback, "http://www.imooc.com"))
        
        loop.run_until_complete(task)
        
        print(task.result())
    

    我们使用partial将传入的参数,伪造成一个函数。

    回调函数会默认接收一个 future 对象参数

    wait和gather

    我们上面已经使用了wait来进行多协程的运行,我们看下它的源码:

    
    FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
    FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
    ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
    
    
    @coroutine
    def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
        """Wait for the Futures and coroutines given by fs to complete.
    
        The sequence futures must not be empty.
    
        Coroutines will be wrapped in Tasks.
    
        Returns two sets of Future: (done, pending).
    
        Usage:
    
            done, pending = yield from asyncio.wait(fs)
    
        Note: This does not raise TimeoutError! Futures that aren't done
        when the timeout occurs are returned in the second set.
        """
        if futures.isfuture(fs) or coroutines.iscoroutine(fs):
            raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
        if not fs:
            raise ValueError('Set of coroutines/Futures is empty.')
        if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
            raise ValueError('Invalid return_when value: {}'.format(return_when))
    
        if loop is None:
            loop = events.get_event_loop()
    
        fs = {ensure_future(f, loop=loop) for f in set(fs)}
    
        return (yield from _wait(fs, timeout, return_when, loop))
    

    这个wait我们理解为多线程中的wait,同样存在return_when参数,可以指定何时返回。

    gather如何使用呢?

    if __name__ == "__main__":
        start_time = time.time()
        loop = asyncio.get_event_loop()
        tasks = [get_html("http://www.imooc.com") for i in range(10)]
        loop.run_until_complete(asyncio.gather(*tasks))
        print(time.time()-start_time)
    
    

    我们将wait直接修改为gather然后可迭代对象加上*即可。

    两者的区别是什么呢?

    1. gather更加hight-level
    2. gather可以将协程分组
    group1 = [get_html("http://projectsedu.com") for i in range(2)]
    group2 = [get_html("http://www.imooc.com") for i in range(2)]
    
    # 我们可以分组传递
    loop.run_until_complete(asyncio.gather(*group1, *group2))
    
    # 我们可以将先进行gather操作
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))
    
    # 我们可以批量取消某个分组
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    
    group2.cancel()
    
    task取消和子协程调用原理

    我们先看下run_until_completerun_forever两个函数的区别。

    run_until_complete在运行完指定的协程之后就会停止,而run_forever则会一直运行。

    看下源码:

    image.png

    在图片中我们看到run_until_complete里面同样使用了run_forever。但是,增加了一个回调_run_until_complete_cb

    def _run_until_complete_cb(fut):
        exc = fut._exception
        if (isinstance(exc, BaseException) and not isinstance(exc, Exception)):
            # Issue #22429: run_forever() already finished, no need to
            # stop it.
            return
        fut._loop.stop()
    

    在回调函数中当没有协程运行的时候会将loop即事件循环直接暂停。

    asyncio会将loop放到future中,而future同样会被放到loop中。

    因此我们可以在任何一个任务中停止掉 loop

    如何取消协程中的task(future)
    import asyncio
    
    
    async def get_html(sleep_times):
        print("waiting")
        await asyncio.sleep(sleep_times)
        print("done after {}s".format(sleep_times))
    
    
    if __name__ == "__main__":
        task1 = get_html(2)
        task2 = get_html(3)
        task3 = get_html(3)
    
        tasks = [task1, task2, task3]
    
        loop = asyncio.get_event_loop()
    
        try:
            loop.run_until_complete(asyncio.wait(tasks))
        # 我们发送一个 controle + c 异常
        except KeyboardInterrupt as e:
            # 获得所有的task
            all_tasks = asyncio.Task.all_tasks()
            for task in all_tasks:
                print("cancel task")
                
                # 将task 取消 返回布尔值
                print(task.cancel())
            
                    # 先将 loop 暂停
            loop.stop()
            
            # 记得将 loop 再次运行 run_forever 否则将报错
            loop.run_forever()
        finally:
            # 最后 关闭 loop
            loop.close()
    

    有咩有小伙伴对all_tasks = asyncio.Task.all_tasks()这句代码疑惑?

    image.png

    我们看了源码就知道了 因为全局只有一个loop,所以能够在任何位置轻松获得loop相关的信息。

    如何在协程中插入子协程

    我们看一段官方文档的代码:

    官方文档叫chain coroutines链式协程?

    import asyncio
    
    async def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        await asyncio.sleep(1.0)
        return x + y
    
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()
    

    compute() is chained to print_sum(): print_sum() coroutine waits until compute() is completed before returning its result.

    序列图:

    image.png

    图中展示大致意思:当我们运行一个协程的时候,立即创建一个Task,由EventLoop驱动Task,然后Task驱动print_sum。当协程中调用了另外一个子协程的时候,是直接由Task和子协程通信的。直至子协程运行完毕抛出StopIteration异常,然后父协程会捕捉到异常并提取出结果,父协程运行完毕,同样抛出异常,逐层往上抛出然后终止Task。重点在于Task和子协程compute之间的通道,以及异常抛出拦截。

    The “Task” is created by the AbstractEventLoop.run_until_complete() method when it gets a coroutine object instead of a task.

    意思是,图中的Task并不是一个任务而是一个协程对象。

    The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses AbstractEventLoop.call_later() to wake up the task in 1 second.

    意思是,图大致讲了如何在协程中调用子协程,但是内部实现没有体现出来。

    例如:当调用asyncio.sleep(1.0)的时候会创建一个内部的future对象然后使用 AbstractEventLoop.call_later() 在一秒后唤醒任务。

    asyncio中的其他函数
    call_soon 函数
    import asyncio
    
    
    def callback(sleep_times):
        print(f"success time {sleep_times}")
    
    
    def stoploop(loop):
        loop.stop()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
    
        # 这里传入的是函数名称 不是协程 因为很多时候 我们希望在循环体系中插入一个函数
        # call_soon 是即刻执行 比不是下一行代码执行 而是等到下一个循环的时候执行
        loop.call_soon(callback, 2)
    
        # 停止时间循环
        loop.call_soon(stoploop, loop)
    
        # 因为我们传入的不是协程 而是函数 因此启动要使用  run_forever
        loop.run_forever()
    
    call_later 函数
    import asyncio
    
    
    def callback(sleep_times):
        print(f"success time {sleep_times}")
    
    
    def stoploop(loop):
        loop.stop()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
    
        # call_later 是延迟调用
        loop.call_later(2, callback, 2)
        loop.call_later(1, callback, 1)
        loop.call_later(3, callback, 3)
    
        loop.run_forever()
        
    # 输出
    success time 1
    success time 2
    success time 3
    

    从输出看出 call_later并不是根据添加的顺序执行的 而是根据延迟的时间。

    为了进一步比较call_latercall_soon的区别我们看下下面代码的输出

    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
    
        # call_later 是延迟调用
        loop.call_later(2, callback, 2)
        loop.call_later(1, callback, 1)
        loop.call_later(3, callback, 3)
    
        loop.call_soon(callback, 4)
    
        loop.run_forever()
    # 输出
    
    success time 4
    success time 1
    success time 2
    success time 3
    

    我们看到call_soon执行是比call_later要早的 是下个循环立即执行

    call_at函数

    call_at函数可以让我们指定时间运行回调函数,这里的时间是 loop里面的时间 不是传统的时间

    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
            
            # 获得loop的当前时间
        loop_time = loop.time()
    
        # 使用 call_at 在 当前时间的基础上 延迟几秒执行回调
        loop.call_at(loop_time + 2, callback, 2)
        loop.call_at(loop_time + 1, callback, 1)
        loop.call_at(loop_time + 3, callback, 3)
    
        loop.call_soon(callback, 4)
    
        loop.run_forever()
        
    # 输出
    success time 4
    success time 1
    success time 2
    success time 3
    
    call_soon_threadsafe 函数

    这是一个线程安全的函数 作用和 call_soon一样

    asyncio是可以在多线程环境下运行的,asyncio是一整套的异步IO解决方案,不仅可以解决协程调度问题,还可以解决线程、进程问题。

    def call_soon_threadsafe(self, callback, *args):
        """Like call_soon(), but thread-safe."""
        self._check_closed()
        if self._debug:
            self._check_callback(callback, 'call_soon_threadsafe')
        handle = self._call_soon(callback, args)
        if handle._source_traceback:
            del handle._source_tracebac
        # 又这个函数实现线程安全的
        self._write_to_self()
        return handle
    

    当我们在多线程中 多个回调函数使用了一个变量 可以使用这个来保证线程安全

    相关文章

      网友评论

          本文标题:asyncio并发编程-上

          本文链接:https://www.haomeiwen.com/subject/syxrhhtx.html