美文网首页Fluent Python
使用 asyncio 包处理并发

使用 asyncio 包处理并发

作者: 一块大番薯 | 来源:发表于2018-03-20 10:33 被阅读108次

并发与并行(Concurrency、Parallelism)

  • 真正的并行需要多个核心
  • 例如 4 个核心 CPU,最多只能 4 并行
    如果每个核心运行 3 个线程,则 4 并行,12 并发,12 线程
  • 并发更有用

asyncio 包

  • 使用协程(事件循环驱动)实现并发
  • 巧妙的回调:
    yield from ... 变成 yield From(...)、return result 变成 raise Return(result)

使用线程实现并发(threading 模块)

Python 没有提供终止线程的 API,所以要关闭线程,必须给线程发送信息
此处利用 signal.go 属性,从外部控制线程。

import sys
import time
import threading
import itertools


class Signal:
    go = True


def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))  # 使用退格符(\x08)把光标移回来
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status)) 


def slow_func():
    time.sleep(3)
    return 42


def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_func()
    signal.go = False
    spinner.join()  # 等待 spinner 线程结束
    return result


def main():
    result = supervisor()
    print('Answer:', result)


if __name__ == '__main__':
    main()

使用协程实现并发(asyncio 模块)

  • 严格的协程定义:
    asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield

  • 适合 asyncio 的协程要由调用方通过 yield from 调用
    或者把协程传给 asyncio 包中某个函数

  • @asyncio.coroutine 装饰器应该应用在协程上。其一凸显协程,其二有助调试(即如果协程没有产生值,则被垃圾回收,发出warning)。但不会预激协程。

  • yield from 把控制权交给事件循环。而类似 time.sleep(.1) 反而阻塞事件循环

import sys
import asyncio
import itertools


@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield  from asyncio.sleep(.1)  # 协程 asyncio.sleep 通过 yield from 调用
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_func():
    yield from asyncio.sleep(3) # 把控制权交给主循环
    return 42


@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!')) # 协程 spin 通过 asyncio.async 调用
    print('spinner object:', spinner)  # Task 对象,类似 Thread 对象
    result = yield from slow_func()  # 协程 slow_func 通过 yield from 调用
    spinner.cancel()
    return result


def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())  # 驱动协程
    loop.close()
    print('Answer:', result)


if __name__ == '__main__':
    main()

对比线程和协程:

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_func()
    signal.go = False
    spinner.join()  
    return result

@asyncio.coroutine
def supervisor():
    spinner = asyncio.async(spin('thinking!')) 
    print('spinner object:', spinner)
    result = yield from slow_func()  
    spinner.cancel()
    return result
  • asyncio.Task 和 threading.Thread 等效
    前者驱动协程 spin,后者调用可调用对象 spin
  • Task 对象可由 asyncio.async(coroutine) 或 loop.create_task(...) 创建
  • Task 对象创建时自动排定运行时间,而 Thread 对象需调用 start 方法
  • 线程终止靠外部控制
    任务终止可使用 Task.cancel 方法,在协程内部抛出 CanceledError
  • 联系:阻塞的操作通过协程来实现。而协程会把控制权交给主循环。
    如 time.sleep(3) 替换成 asyncio.sleep(3)
    又如 requests.get(url) 替换成 aiohttp.request('GET', url)

相关文章

网友评论

    本文标题:使用 asyncio 包处理并发

    本文链接:https://www.haomeiwen.com/subject/cvowaxtx.html