美文网首页Python
Python并发编程之协程/异步IO

Python并发编程之协程/异步IO

作者: jacke121 | 来源:发表于2017-09-24 15:57 被阅读310次

    原文:http://www.jianshu.com/p/4e048726b613

    引言

    随着node.js的盛行,相信大家今年多多少少都听到了异步编程这个概念。Python社区虽然对于异步编程的支持相比其他语言稍显迟缓,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await语法层面的支持,刚正式发布的Python3.6中asyncio也已经由临时版改为了稳定版。下面我们就基于Python3.4+来了解一下异步编程的概念以及asyncio的用法。

    什么是协程

    通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对与IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

    其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的“并发”,协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。Python中的asyncio也是基于协程来进行实现的。在进入asyncio之前我们先来了解一下Python中怎么通过生成器进行协程来实现并发。

    example1

    我们先来看一个简单的例子来了解一下什么是协程(coroutine),对生成器不了解的朋友建议先看一下Stackoverflow上面的这篇高票回答

    >>> def coroutine():...reply =yield'hello'...yieldreply

    ...

    >>>c = coroutine()

    >>>next(c)'hello'

    >>>c.send('world')'world'

    example2

    下面这个程序我们要实现的功能就是模拟多个学生同时向一个老师提交作业,按照传统的话我们或许要采用多线程/多进程,但是这里我们可以采用生成器来实现协程用来模拟并发。

    fromcollectionsimportdeque

    def student(name, homeworks):

    forhomeworkinhomeworks.items():

    yield(name, homework[0], homework[1])# 学生"生成"作业给老师

    class Teacher(object):

    def __init__(self, students):

    self.students = deque(students)

    def handle(self):

    """老师处理学生作业"""

    whilelen(self.students):

    student = self.students.pop()

    try:

    homework = next(student)

    print('handling', homework[0], homework[1], homework[2])

    exceptStopIteration:

    pass

    else:

    self.students.appendleft(student)

    下面我们来调用一下这个程序。

    Teacher([

    student('Student1', {'math':'1+1=2','cs':'operating system'}),

    student('Student2', {'math':'2+2=4','cs':'computer graphics'}),

    student('Student3', {'math':'3+3=5','cs':'compiler construction'})

    ]).handle()

    这是输出结果,我们仅仅只用了一个简单的生成器就实现了并发(concurrence),注意不是并行(parallel),因为我们的程序仅仅是运行在一个单线程当中。

    handling Student3 cs compiler construction

    handling Student2 cs computer graphics

    handling Student1 cs operating system

    handling Student3 math 3+3=5

    handling Student2 math 2+2=4

    handling Student1 math 1+1=2

    使用asyncio模块实现协程

    从Python3.4开始asyncio模块加入到了标准库,通过asyncio我们可以轻松实现协程来完成异步IO操作。

    解释一下下面这段代码,我们自己定义了一个协程display_date(num, loop),然后它使用关键字yield from来等待协程asyncio.sleep(2)的返回结果。而在这等待的2s之间它会让出CPU的执行权,直到asyncio.sleep(2)返回结果。gather()或者wait()来返回future的执行结果。

    # coroutine.pyimportasyncioimportdatetime

    @asyncio.coroutine  # 声明一个协程def display_date(num, loop):

    end_time = loop.time() +10.0

    whileTrue:

    print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

    if(loop.time() +1.0) >= end_time:

    break

    yieldfromasyncio.sleep(2)# 阻塞直到协程sleep(2)返回结果

    loop = asyncio.get_event_loop()# 获取一个event_loop

    tasks = [display_date(1, loop), display_date(2, loop)]

    loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

    loop.close()

    下面是运行结果,注意到并发的效果没有,程序从开始到结束只用大约10s,而在这里我们并没有使用任何的多线程/多进程代码。在实际项目中你可以将asyncio.sleep(secends)替换成相应的IO任务,比如数据库/磁盘文件读写等操作。

    ziwenxie :: ~ » python coroutine.py

    Loop:1Time:2016-12-1916:06:46.515329

    Loop:2Time:2016-12-1916:06:46.515446

    Loop:1Time:2016-12-1916:06:48.517613

    Loop:2Time:2016-12-1916:06:48.517724

    Loop:1Time:2016-12-1916:06:50.520005

    Loop:2Time:2016-12-1916:06:50.520169

    Loop:1Time:2016-12-1916:06:52.522452

    Loop:2Time:2016-12-1916:06:52.522567

    Loop:1Time:2016-12-1916:06:54.524889

    Loop:2Time:2016-12-1916:06:54.525031

    Loop:1Time:2016-12-1916:06:56.527713

    Loop:2Time:2016-12-1916:06:56.528102

    在Python3.5中为我们提供更直接的对协程的支持,引入了async/await关键字,上面的代码我们可以这样改写,使用async代替了@asyncio.coroutine,使用了await代替了yield from,这样我们的代码变得更加简洁可读。

    importasyncioimportdatetime

    asyncdef display_date(num, loop):# 声明一个协程

    end_time = loop.time() +10.0

    whileTrue:

    print("Loop: {} Time: {}".format(num, datetime.datetime.now()))

    if(loop.time() +1.0) >= end_time:

    break

    awaitasyncio.sleep(2)# 等同于yield from

    loop = asyncio.get_event_loop()# 获取一个event_loop

    tasks = [display_date(1, loop), display_date(2, loop)]

    loop.run_until_complete(asyncio.gather(*tasks))# 阻塞直到所有的tasks完成

    loop.close()

    asyncio模块的其他方法

    开启事件循环有两种方法,一种方法就是通过调用run_until_complete,另外一种就是调用run_forever。run_until_complete内置add_done_callback,使用run_forever的好处是可以通过自己自定义add_done_callback,具体差异请看下面两个例子。

    run_until_complete()

    importasyncio

    asyncdef slow_operation(future):

    awaitasyncio.sleep(1)

    future.set_result('Future is done!')

    loop = asyncio.get_event_loop()

    future = asyncio.Future()

    asyncio.ensure_future(slow_operation(future))

    print(loop.is_running())# False

    loop.run_until_complete(future)

    print(future.result())

    loop.close()

    run_forever()

    run_forever相比run_until_complete的优势是添加了一个add_done_callback,可以让我们在task(future)完成的时候调用相应的方法进行后续处理。

    importasyncio

    asyncdef slow_operation(future):

    awaitasyncio.sleep(1)

    future.set_result('Future is done!')

    def got_result(future):

    print(future.result())

    loop.stop()

    loop = asyncio.get_event_loop()

    future = asyncio.Future()

    asyncio.ensure_future(slow_operation(future))

    future.add_done_callback(got_result)try:

    loop.run_forever()finally:

    loop.close()

    这里还要注意一点,即使你调用了协程方法,但是如果事件循环没有开启,协程也不会执行,参考官方文档的描述,我刚被坑过。

    Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There aretwobasic ways to start it running: call await coroutine or yield from coroutine from another coroutine (assuming the other coroutine is already running!), or schedule its execution using theensure_future()function or theAbstractEventLoop.create_task()method. Coroutines (and tasks) can only run when the event loop is running.

    Call

    call_soon()

    importasyncio

    def hello_world(loop):

    print('Hello World')

    loop.stop()

    loop = asyncio.get_event_loop()

    # Schedule a call to hello_world()

    loop.call_soon(hello_world, loop)

    # Blocking call interrupted by loop.stop()

    loop.run_forever()

    loop.close()

    下面是运行结果,我们可以通过call_soon提前注册我们的task,并且也可以根据返回的Handle进行cancel。

    Hello World

    call_later()

    importasyncioimportdatetime

    def display_date(end_time, loop):

    print(datetime.datetime.now())

    if(loop.time() +1.0) < end_time:

    loop.call_later(1, display_date, end_time, loop)

    else:

    loop.stop()

    loop = asyncio.get_event_loop()

    # Schedule the first call to display_date()

    end_time = loop.time() +5.0

    loop.call_soon(display_date, end_time, loop)

    # Blocking call interrupted by loop.stop()

    loop.run_forever()

    loop.close()

    改动一下上面的例子我们来看一下call_later的用法,注意这里并没有像上面那样使用while循环进行操作,我们可以通过call_later来设置每隔1秒去调用display_date()方法。

    2016-12-24 19:17:13.421649

    2016-12-24 19:17:14.422933

    2016-12-24 19:17:15.424315

    2016-12-24 19:17:16.425571

    2016-12-24 19:17:17.426874

    Chain coroutines

    importasyncio

    asyncdef compute(x, y):

    print("Compute %s + %s ..."% (x, y))

    awaitasyncio.sleep(1.0)# 协程compute不会继续往下面执行,直到协程sleep返回结果

    returnx + y

    asyncdef print_sum(x, y):

    result =awaitcompute(x, y)# 协程print_sum不会继续往下执行,直到协程compute返回结果

    print("%s + %s = %s"% (x, y, result))

    loop = asyncio.get_event_loop()

    loop.run_until_complete(print_sum(1,2))

    loop.close()

    下面是输出结果

    ziwenxie :: ~ » python chain.py

    Compute 1 + 2 ...

    1 + 2 = 3

    如何将同步的代码改成异步

    结合上面提到的内容下面来小结一下如何将同步的代码改成异步

    同步模型

    def handle(id):

    subject = get_subject_from_db(id)# 1

    buyinfo = get_buyinfo(id)# 2

    change = process(subject, buyinfo)

    notify_change(change)

    flush_cache(id)

    上面是一个典型的同步编程模型,每个步骤必须建立在上一个步骤完成的前提,但是注意到步骤1和步骤2之间并没有任何的关系,所以可以将这两个IO型改成异步的,让两者可以并发进行。

    异步模型

    # 先要将get_subject_from_db, get_buyinfo, process, notify_change修改成协程函数/方法importasyncio

    def handle(id):

    subject = asyncio.ensure_future(get_subject_from_db(id))# 1

    buyinfo = asyncio.ensure_future(get_buyinfo(id))# 2

    results = asyncio.gather(subject, buyinfo)

    change =awaitprocess(results)

    awaitnotify_change(change)

    loop.call_soon(flush_cache(id))

    使用ensure_future, loop.crate_task, Task可以将协程包装成一个Future对象,这里我们选择ensure_future。

    Queue

    在asyncio使用Queue来模拟生产者-消费者模式:

    importasyncioimportrandom

    asyncdef produce(queue, n):

    forxinrange(n):

    # produce an item

    print('producing {}/{}'.format(x, n))

    # simulate i/o operation using sleep

    awaitasyncio.sleep(random.random())

    item = str(x)

    # put the item in the queue

    awaitqueue.put(item)

    asyncdef consume(queue):

    whileTrue:

    # wait for an item from the producer

    item =awaitqueue.get()

    # process the item

    print('consuming {}...'.format(item))

    # simulate i/o operation using sleep

    awaitasyncio.sleep(random.random())

    # Notify the queue that the item has been processed

    queue.task_done()

    asyncdef run(n):

    queue = asyncio.Queue()

    # schedule the consumer

    consumer = asyncio.ensure_future(consume(queue))

    # run the producer and wait for completion

    awaitproduce(queue, n)

    # wait until the consumer has processed all items

    awaitqueue.join()

    # the consumer is still awaiting for an item, cancel it

    consumer.cancel()

    loop = asyncio.get_event_loop()

    loop.run_until_complete(run(10))

    loop.close()

    实战

    by the way:在asyncio中使用requests没有任何意义,requests是基于同步实现的,目前也没有要支持asyncio的动向,如果要充分发回异步的威力,应该使用aiohttp。而且也要合理使用concurrent.futures模块提供的线程池/进程池。

    Asyncio+Aiohttp

    importaiohttpimportasyncioimporttime

    NUMBERS = range(12)

    URL ='http://httpbin.org/get?a={}'

    asyncdef fetch_async(a):

    asyncwithaiohttp.request('GET', URL.format(a))asr:

    data =awaitr.json()

    returndata['args']['a']

    start = time.time()

    event_loop = asyncio.get_event_loop()

    tasks = [fetch_async(num)fornuminNUMBERS]

    results = event_loop.run_until_complete(asyncio.gather(*tasks))

    fornum, resultinzip(NUMBERS, results):

    print('fetch({}) = {}'.format(num, result))

    print('Use asyncio+aiohttp cost: {}'.format(time.time() - start))

    下面是运行结果:

    ziwenxie :: ~ » python example1.py

    fetch(0) =0

    fetch(1) =1

    fetch(2) =2

    fetch(3) =3

    fetch(4) =4

    fetch(5) =5

    fetch(6) =6

    fetch(7) =7

    fetch(8) =8

    fetch(9) =9

    fetch(10) =10

    fetch(11) =11

    Use asyncio+aiohttp cost:0.8980867862701416

    Requests+Pool

    如果使用传统的Requests和ThreadPool/ProcessPool方式的话,由于多线程/多进程之间切换的开销速度会慢了许多。

    importrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

    NUMBERS = range(12)

    URL ='http://httpbin.org/get?a={}'

    def fetch(a):

    r = requests.get(URL.format(a))

    returnr.json()['args']['a']

    start = time.time()withThreadPoolExecutor(max_workers=3)asexecutor:

    fornum, resultinzip(NUMBERS, executor.map(fetch, NUMBERS)):

    print('fetch({}) = {}'.format(num, result))

    print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

    线程池的执行结果:

    ziwenxie :: ~ » python example2.py

    fetch(0) =0

    fetch(1) =1

    fetch(2) =2

    fetch(3) =3

    fetch(4) =4

    fetch(5) =5

    fetch(6) =6

    fetch(7) =7

    fetch(8) =8

    fetch(9) =9

    fetch(10) =10

    fetch(11) =11

    Use requests+ThreadPoolExecutor cost:3.356502056121826

    进程池的执行结果:

    fetch(0) = 0

    fetch(1) = 1

    fetch(2) = 2

    fetch(3) = 3

    fetch(4) = 4

    fetch(5) = 5

    fetch(6) = 6

    fetch(7) = 7

    fetch(8) = 8

    fetch(9) = 9

    fetch(10) = 10

    fetch(11) = 11

    Use requests+ProcessPoolExecutor cost: 3.2979931831359863

    Asyncio+Requests+Pool

    虽然上面提到requests不支持异步,但是在某些情形需要控制event loop中运行在单独的线程/进程中的function会阻塞直到这些function返回结果,这个时候可以结合run_in_executor()和wait()来进行控制。

    p.s:下面这个例子在处理纯IO任务的时候并没有太多的意义,只是为了理解如何在不支持异步的模块中引入异步的概念。

    importasyncioimportrequestsimporttimefromconcurrent.futuresimportThreadPoolExecutor

    NUMBERS = range(12)

    URL ='http://httpbin.org/get?a={}'

    def fetch(a):

    r = requests.get(URL.format(a))

    returnr.json()['args']['a']

    asyncdef run_scraper_tasks(executor):

    loop = asyncio.get_event_loop()

    blocking_tasks = []

    fornuminNUMBERS:

    task = loop.run_in_executor(executor, fetch, num)

    task.__num = num

    blocking_tasks.append(task)

    completed, pending =awaitasyncio.wait(blocking_tasks)

    results = {t.__num: t.result()fortincompleted}

    fornum, resultinsorted(results.items(), key=lambdax: x[0]):

    print('fetch({}) = {}'.format(num, result))

    start = time.time()

    executor = ThreadPoolExecutor(3)

    event_loop = asyncio.get_event_loop()

    event_loop.run_until_complete(

    run_scraper_tasks(executor)

    )

    print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

    结果可想而知与requests+ThreadPoolExecutor执行速度上并没有太多的差别,因为我们的IO任务还是放在对应的子线程中去处理的,只是这里通过wait引入了异步的概念,但是在某些场景可以取得更大自由度程度的控制。

    fetch(0) =0

    fetch(1) =1

    fetch(2) =2

    fetch(3) =3

    fetch(4) =4

    fetch(5) =5

    fetch(6) =6

    fetch(7) =7

    fetch(8) =8

    fetch(9) =9

    fetch(10) =10

    fetch(11) =11

    Use asyncio+requests+ThreadPoolExecutor cost:3.614989995956421

    Semaphore

    爬虫一次性的产生过多的请求账号/IP很快就会被封掉,可以考虑使用Semaphore控制同时的并发量,与我们熟悉的threading模块中的Semaphore(信号量)用法类似。

    importaiohttpimportasyncio

    NUMBERS = range(12)

    URL ='http://httpbin.org/get?a={}'

    sema = asyncio.Semaphore(3)

    asyncdef fetch_async(a):

    asyncwithaiohttp.request('GET', URL.format(a))asr:

    data =awaitr.json()

    returndata['args']['a']

    asyncdef print_result(a):

    with(awaitsema):

    r =awaitfetch_async(a)

    print('fetch({}) = {}'.format(a, r))

    loop = asyncio.get_event_loop()

    f = asyncio.wait([print_result(num)fornuminNUMBERS])

    loop.run_until_complete(f)

    可以到后台看到并发受到了信号量的限制,同一时刻一般只处理三个请求。

    References

    DOCUMENTATION OF ASYNCIO1

    DOCUMENTATION OF ASYNCIO2

    COROUTINES AND ASYNC/AWAIT

    GOLD-XITU1

    GOLD-XITU2

    STACKOVERFLOW

    PyMOTW-3

    500LINES

    相关文章

      网友评论

      本文标题:Python并发编程之协程/异步IO

      本文链接:https://www.haomeiwen.com/subject/vwwkextx.html