美文网首页
异步操作之asyncio(二)

异步操作之asyncio(二)

作者: 长歌行夜一 | 来源:发表于2019-04-14 23:27 被阅读0次

    尽管asyncio应用通常作为单线程运行,不过仍被构建为并发应用。由于I/O以及其他外部事件的延迟和中断,每个协程或任务可能按一种不可预知的顺序执行。为了支持安全的并发执行,asyncio包含了threading和multiprocessing模块中的一些底层原语的实现。

    1.锁(LOCK)

    锁可以用来保护对一个共享资源的访问。只有锁的持有者可以使用这个资源。如果有多个请求要的到这个锁,那么其将会阻塞,以保证一次只有一个持有者。

    import asyncio

    from functoolsimport partial

    def unlock(lock):

        print("callback释放锁")

        lock.release()

    async def coro1(lock):

        print("并行中,coro1等待锁")

        # with语句有自动释放锁的作用

        async with lock:

            print("coro1被锁了")

        print("coro1的锁释放了",lock.locked())

    async def coro2(lock):

        print("并行中,coro2等待锁")

        # 等待加锁

        try:

            await lock.acquire()

            print(f"当前coro2是否被锁", lock.locked())

        finally:

            lock.release()

            print("coro2的锁释放了")

    async def coro3(lock):

        print("并行中,coro3等待锁")

        try:

            print("coro3没有加锁加试图释放")

            lock.release()

        except RuntimeError as e:

            print("触发RuntimeError的错误")

    import time

    async def main(loop):

        lock= asyncio.Lock()

        loop.call_later(0.1, partial(unlock, lock))

        print("等待协程")

        await asyncio.wait([coro1(lock), coro2(lock), coro3(lock)])

    if __name__== '__main__':

        loop= asyncio.get_event_loop()

        try:

            loop.run_until_complete(main(loop))

        finally:

            loop.close()

    结论:

    lock.acquire需要使用await。

    lock.release不需要加await。

    在加锁之前coro1和coro2 ,coro3是并发执行的。

    锁有两种使用方式和像coro1一样通过async with 异步上下文关键字进行锁定,还可以通过coro2那种通过await方式使用acquire加锁,结束的时候使用release释放锁。

    如果没有使用acquire进行加锁,就试图使用release去释放,将触发RuntimeError的异常,像coro3协程一样。


    2.事件(Event)

    asyncio.Event基于threading.Event。允许多个消费者等待某个事件发生,而不必寻找一个特定值与通知关联。

    发现好像和lock也没啥区别。其实区别的话就是一旦触发了事件,coro1和coro2协程就会立即启动,不需要得到事件对象上的唯一的锁了。

    import asyncio

    import functools

    def callback(event):

        print('callback中设置event')

        event.set()

    async def coro1(name, event):

        print(f'{name}等待事件')

        await event.wait()

        print(f'{name}触发')

    async def coro2(name, event):

        print(f'{name}等待事件')

        await event.wait()

        print(f'{name}触发')

    async def main(loop):

        event= asyncio.Event()

        print(f'当前事件状态: {event.is_set()}')

        loop.call_later(0.1, functools.partial(callback, event) )

        await asyncio.wait([coro1('coro1', event), coro2('coro2', event)])

        print(f'当前事件状态: {event.is_set()}')

    if __name__== '__main__':

        loop= asyncio.get_event_loop()

        loop.run_until_complete(main(loop))

        loop.close()


    3)条件(condition)

    Condition的做法与Event类似,只不过不是通知所有的协程等待的协程,被唤醒的等待协程的数目由notify的一个参数控制。

    import asyncio

    async def consumer(cond, name, second):

        await asyncio.sleep(second)

        async with cond:

            await cond.wait()

            print(f'{name}:资源可供消费者使用')

    async def producer(cond):

        await asyncio.sleep(2)

        for nin range(1, 3):

            async with cond:

                print(f'唤醒消费者 {n}')

                cond.notify(n=n)

                await asyncio.sleep(0.1)

    async def producer2(cond):

        await asyncio.sleep(2)

        with await cond:

            print('让资源变的可用')

            cond.notify_all()

    async def main(loop):

        condition= asyncio.Condition()

        task= loop.create_task(producer(condition))

        consumers= [consumer(condition, name, index) for index, namein enumerate(('c1', 'c2'))]

        print(consumers)

        await asyncio.wait(consumers)

        task.cancel()

        task= loop.create_task(producer2(condition))

        consumers= [consumer(condition, name, index) for index, namein enumerate(('c1', 'c2'))]

        await asyncio.wait(consumers)

        task.cancel()

    if __name__== '__main__':

        loop= asyncio.get_event_loop()

        loop.run_until_complete(main(loop))

        loop.close()

    对上面的代码做简单的分析

    使用notify方法挨个通知单个消费者

    使用notifyall方法一次性的通知全部消费者

    由于producer和producer2是异步的函数,所以不能使用之前calllater方法,需要用createtask把它创建成一个任务,或者asyncio.ensurefuture.


    4) 队列(Queue)

    asyncio.Queue为协程提供了一个先进先出的数据结构,这与线程queue.Queue或进程的multiprocess,Queue很类似。这里直接上一个aiohtpp爬虫使用的例子

    相关文章

      网友评论

          本文标题:异步操作之asyncio(二)

          本文链接:https://www.haomeiwen.com/subject/lyojwqtx.html