美文网首页python程序员python热爱者
Python3 异步协程函数async具体用法

Python3 异步协程函数async具体用法

作者: 予岁月以文明 | 来源:发表于2017-11-09 11:58 被阅读5168次

    之前使用Python的人往往纠缠在多线程,多进程,评判哪个效率更高?
    其实,相对于别家的协程和异步,不管多线程还是多进程效率都要被吊打,多线程之间切换耗费cpu寄存器资源,OS 调度的不太可控,多进程间通信不便的问题。
    后来Python改进了语法,引入了yiled from充当协程调度,后来有人根据这个新特性开发了第三方协程框架,Tornado,Gevent等。
    在这场效率之争里,Python这么受欢迎的语言,官方怎么能默不出声?所以Python之父深入简出3年,苦心钻研自家的协程,async/await和asyncio库,并放到Python3.5后成为远程原生的协程,
    对于类似爬虫这种延时的IO操作,协程是个大利器,优点很多,他可以在一个阻塞发生时,挂起当前程序,跑去执行其他程序,把事件注册到循环中,实现多程序并发,据说超越了10k限制,不过我没有试验过极限。
    现在讲一讲协程的简单的用法,当你爬一个网站,有100个网页,正常是请求一次,回来一次

    for url in urls:
      response=get(url)
      results=parse(response)
    

    这样效率很低,但协程可以一次发起100个请求(其实也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,挂起100个,然后同时等待100个返回,效率提升了100倍。可以理解为同时做100件事,相对于多线程,做到了由自己调度而不是交给CPU,程序流程可控,节约资源,效率极大提升。

    async def get(url):#定义协程抓取函数,这里用了aiohttp库
      
        async with aiohttp.ClientSession() as session:#协程上下文
            async with session.get(url) as response:
                return await response.text()
    #await 是挂起命令,挂起当前,执行response.text(),response.text()执行完成后重新激活当前函数继续运行,返回。
    
    

    如果response.text迟迟不回,程序不会死等,而是去你定义的任务循环中寻找另一个任务(如果有的话),如果没有循环任务,那就死等咯。。。毕竟总要有等返回结果的。
    所以实现协程就是要实现多个任务的循环。用一张简单的图表示。


    timg.jpg

    也就是说任务一直跑,每到一个地方awit一次,然后await返回,直到最终全部返回,主程序结束。

    调用协程
    协程不能直接运行,需要把协程加入到事件循环(loop)。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

    import time
    import asyncio
    
    now = lambda : time.time()
    async def do_some_work(x):
        print('Waiting: ', x)
    start = now()
    coroutine = do_some_work(2)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    print('TIME: ', now() - start)
    

    结果

    Waiting:  2
    TIME:  0.00016564312531
    

    关于task

    协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于未来获取协程的结果。

    import asyncio
    import time
    
    now = lambda : time.time()
    
    async def do_some_work(x):
        print('Waiting: ', x)
    
    start = now()
    
    coroutine = do_some_work(2)
    loop = asyncio.get_event_loop()
    # task = asyncio.ensure_future(coroutine)
    task = loop.create_task(coroutine)
    print(task)
    loop.run_until_complete(task)
    print(task)
    print('TIME: ', now() - start)
    

    输出结果为:

    <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>>
    Waiting:  2
    <Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None>
    TIME:  0.0003490447998046875
    

    创建task后,task在加入事件循环之前是pending状态,当loop事件开始循环,所有pending状态的task都开始执行到await那一步(函数体内并非如此),不管loop里是否调用

    asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True。

    就是这么个道理,协成里面也可以放yield 表达式,变成生成器式协程,可以使用 async for x in 生成式协程。比如

    async def duo():
        print('多多')
        await asyncio.sleep(4)
        eiei=yield 666
        print(eiei)
    

    要想取到duo函数弹出的666,需要用

    async for i in duo():
        i+=100
        print(i)
    

    再举一个例子,典型的消费,生产者模型,说是买土豆,实际是拿字典,来源知乎,我做了修改。

    from time import sleep
    from random import randint,random
    import asyncio
    all_potatos={x:randint(1,50) for x  in "abcd"}
    
    async def take_potatos(num):
        count = 0
        while True:
            if len(all_potatos) == 0:
                await ask_for_potato()
            else:
                potato = all_potatos.popitem()
                yield potato
                count += 1
                if count == num:
                    break
    
    async def buy_potatos():
        bucket = []
        async for p in take_potatos(20):
            bucket.append(p)
            print(bucket)
    
    async def ask_for_potato():
        await asyncio.sleep(3)
        all_potatos.update({x:randint(1,20) for x  in "临兵斗者皆阵列在前"})
    
    def main():
        loop=asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([buy_potatos()])  )
        loop.close()
    
    if __name__ == '__main__':
        main()
    

    在实际生产环境中,最常见的依然是生产-消费模型,用队列串联,在任务开始就初始化多个worker,用join方法挂起任务,让worker无限循环,get队列里的值。直到队列为空。以下这个例子。

    import time
    import asyncio
    from asyncio import Queue
    
    
    def now(): return time.time()
    
    
    async def worker(q):#工作者消费队列
        print('Start worker')
    
        while 1:#无限循环
            start = now()
            task = await q.get()#开始消费
            if not task:
                await asyncio.sleep(1)
                continue
            print('working on ', int(task))
            await asyncio.sleep(int(task))
            q.task_done()#队列通知
            print('Job Done for ', task, now() - start)
    
    
    async def generate_run(q):#生成worker线程函数
        asyncio.ensure_future(worker(q))
        asyncio.ensure_future(worker(q))#先弄了两个worker去跑
        await q.join()主线程挂起等待队列完成通知
        jobs = asyncio.Task.all_tasks()完成后收集所有线程,这里是3个,算上自己
        print('是否已经关闭任务', asyncio.gather(*jobs).cancel())#关闭线程方法,返回True
    
    
    def main():
    
        loop = asyncio.get_event_loop()
        q = Queue()
        for i in range(3):
            q.put_nowait(str(i))#一定要放入字符,数字0是空,队列一直不会结束。
        loop.run_until_complete(generate_run(q))#启动生成函数
    
        loop.close()
    
    
    if __name__ == '__main__':
        main()
    
    
    

    相关文章

      网友评论

        本文标题:Python3 异步协程函数async具体用法

        本文链接:https://www.haomeiwen.com/subject/gakemxtx.html