阻塞(Block)
:进群:960410445 即可获取数十套PDF~!
阻塞调用是指调用结果返回之前,当前线程被挂起。(其线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即现场暂停运行,将计算资源让给其他活动线程,当I/O操作结束,改线程阻塞状态解除,重新变成活动线程,继续争夺CPU资源)函数只有在得到结果之后才返回。和同步调用不同的是,同步调用很多时候线程还是激活的,只是从逻辑上当前函数还没有返回而已。
非阻塞(Unblock) :非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
并行和并发
简单来说,并行是指两个或者多个事件在同一时刻发生,通常是当系统有一个以上CPU或CPU核心时,才有可能并行。两个或者多个事件、线程并步抢占CPU资源。而并发是指两个或者多个事件在同一时间间隔内发生,在一定时间间隔内,有多个程序在执行,但在同一时刻,只有一个程序在执行。
生成器和迭代器
通过一个简单的例子理解一下:
生成一个斐波那契數列,输出前N个:
进群:960410445 即可获取数十套PDF!
上述代码通过类的形式将函数封装为一个可迭代对象。通过next方法在循环的时候每次去取一个数,只有在需要使用的时候才会生成,内存占用很小。但是,上述代码较为繁琐,在Python中,有一种语法糖能简化,那就是 yield 。
yield 语法糖
调用和
class 版的完全一致,也可以使用 next 方法等。简单的说, yield 的作用就是把一个函数变为一个 generator ,带有
yield 的函数不再是一个普通函数, Python 解释器会将器视为 generator 。在 for 循环执行时,每次循环都会执行 fab
函数内部的代码,执行到 yield 时。函数就返回一个迭代值,下次迭代时,就从 yield 的下一句继续执行。调用next也是同理。
当函数执行结束时,会抛出 StopIteration 异常,表示迭代完成。
迭代器
Python
中,一个可以被用来 for 循环的对象可统称为可迭代对象。可以用 isinstance() 判断一个对象是否为可迭代对象 (Iterable)
。生成器不但可以作用于 for 循环,还可以被 next 函数不断调用返回下一个值,最后抛出异常。而迭代器可以被 next()
函数调用并不断返回下一个值的对象称为迭代器( Iteratot )对象。
生成器和协程
从语法上讲,生成器是一个带yield语句的函数。协程,又称 微线程 , 纤程 ,英文 Coroutine 。先看看协程的概念:
协程通过允许多个入口点在某些位置暂停和恢复执行来概括用于非抢占式多任务的子程序。
从某些角度来理解,协程其实就是一个可以暂停执行的函数,并且可以恢复继续执行。我们知道,
yield 关键字已经可以暂停执行,如果在暂停后有方法把一些值发送回到暂停窒息的函数中,那么便就可以理解为协程。在Python PEP 342
中,添加了“把东西发回已经暂停的生成器中”的方法,这个方法就是 send() 。
生成器和协程都是通过 python 中 yield 的关键字实现的,不同的是,生成器只调用 next 来不断的生成数据,而协程会调用 next 和 send 来返回结果和接收参数。
其中 next(sfib) 相当于 sfib.send(None) ,可以使得 sfib 运行至第一个 yield 处返回。后续将一个随机的秒数发送给 sfib ,作为当前中断的 yield 表达式的返回值。
yield 表达式的作用包含了三个步骤:
1、 向函数外抛出值
2、 暂停,等待 next() 或 send() 恢复。
3、 赋值,接受 send 发送过来的数据。
结果:
['A', 'B', 0, 1, 2]
['A', 'B', 0, 1, 2]
当 yiled from 后面加上一个生成器之后,就实现了生成的嵌套。实现生成器的嵌套,不一定要使用 yield from ,但它可以让我们避免让自己处理各种料想不到的异常。如果自己去实现,会加大编码的难度。
yield from 的主要功能是打开双向通道,把最外层的调用与最内层的子生成器连接起来,这样二者就可以直接发送和产出值,还可以直接穿入异常。
委派生成器在 yied from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出值发给调用方,子生成器返回之后,解释器会抛出 StopIteration 异常。
委托生成器的作用就是:在调用方与子生成器之间建立一个双向通道。
为什么一定要使用 yield from 语句呢:
在使用 yiled from 语句时,语句为我们已经处理了很多的异常:
greenlet 和 gevent
greenlet
可以实现协程,不过每一次都要人为去指向下一个该执行的协程。 greenlet 可以从一个协程切换到任意其他协程,但必须保证 greenlet
的正常结束,在协程之间的任意切换很容易出现问题。 greelet 是 Stackless 发展来的 Cpython 扩展包, greelet
是底层实现了原生协程的C扩展库。
使用 greenlet 实现的 生产者-消费者 模型:
# 基于greenlet的生产者消费者协程
from greenlet import greenlet
import random
import time
def Producer():
while True:
item = random.randint(1, 10)
print("生产<{}>中...".format(item))
time.sleep(1)
c.switch(item) # 切换到消费者,并将item传入。
def Consumer():
while True:
item = p.switch() # 切换到生产者。等待生产者传递参数item
print("消费<{}>中..".format(item))
c = greenlet(Consumer) # 将普通函数编程协程
p = greenlet(Producer) # 同理
c.switch() # 启动协程,Consumer先执行
"""
从consumer开始执行,执行到item=p.switch()时,程序切换到producer,并等待传参
producer得到执行权后,生成一个item,并往下执行代码
当producer执行到c.switch(item)时,程序携带传递的item切换到consumer,
consumer继续往下执行,直到下一次运行到p.switch时,交出执行权,切换到producer,重复以上过程
"""
greenlet 的价值在于高性能的原生协程,且语义更加明确、显式切换,执行到 switch 时就切换程序
直接将函数包装成协程,可以保留原代码的风格
gevent
gevent
是实现协程的第三方库,通过封装 greenlet , epoll 回调编程模式,生成器协程实现。当遇到 IO
操作时,就自动切换到其他协程,等到 IO 操作完成,再在适当的时候切换回来继续执行。 gevent
会自动切换协程。就保证总有协程在执行,而不是等待 IO 。由于切换实在 IO 操作时自动完成。所以 gevent 需要修改 Python
的自带的一些保准库,这一过程在启动时通过 monkey patch 完成。
"""
gevent: 通过greenlet实现协程,核心就是遇到IO操作,会自动切换到其他协程
"""
# 将python标准库中的一些阻塞操作变为非阻塞
from gevent import monkey;monkey.patch_all()
# 使用猴子补丁要写在第一行
import gevent
def test1():
print("test1")
gevent.sleep(0) # 模拟耗时操作
print("test11")
def test2():
print("test2")
gevent.sleep(0) # 模拟耗时操作
print("test22")
# g1 = gevent.spawn(test1) # 将函数封装成协程,并启动
# g2 = gevent.spawn(test2)
# gevent.joinall([g1, g2])
"""
# joinall() 阻塞当前流程,执行给定的greenlet(列表中的对象),等待程序执行完
# spawn是启动协程,参数为函数名及其参数
运行结果:
test1
test2
test11
test22
代码执行test1,打印test1,遇到gevent.sleep(0)时切换程序,执行test2
test()执行,打印test2,执行到gevent.sleep(0)时切换程序
执行test1在gevent.sleep(0)后面的代码,直到再次遇到gevent时,切换程序
然后在test2中,继续执行gevent后的代码,直到遇到gevent时,再次切换
直到程序执行完毕
"""
gevent的价值在于它的使用基于epoll的libev来避开阻塞;使用基于gevent的高效协程,来切换执行
只在遇到阻塞的时候切换,没有轮询和线程开销。
asyncio.coroutine
在 Python3.4 中加入了 asyncio 库,使得 Python 获得了事件循环的特性,但这个还是以生成器对象为基础, yield from 在 asyncio 模块中很常用。通过 asnyncio+生成器 ,我们可以实现这样一个异步的模型:
import asyncio
@asyncio.coroutine
def counttdown(number, n):
while n > 0:
print("T-minus", n, "({})".format(number))
yield from asyncio.sleep(1)
n -= 1
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(counttdown("A", 2)),
asyncio.ensure_future(counttdown("B", 5)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
这里
asyncio.coroutine 装饰器是用来标记这个函数是一个协程,因为 asyncio 要求所有用作协程的生成器必须由
asyncio.coroutine 装饰。这段代码中,事件循环会启动两个 countdown() 协程,它们会一直执行,知道遇到 yield
from asyncio.sleep() ,暂停执行,并将一个 async.Future 对象返回给事件循环。事件循环会监控这个
asyncio.Future 对象,一旦执行完成后,会将这个 Future 的执行结果返回给刚刚因为这个 Futur
e暂停的协程,并且继续执行原协程。
event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
协程对象不能直接运行,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成一个任务( task )对象。 task 对象是 Future 的子类,保存了协程运行后的状态,用于未来获取协程的结果。
在上面的代码中, asyncio.sleep 中,创建了一个 Futrure 对象,作为更内层的协程对象,通过 yield from 交给了事件循环,而 Future 是一个实现了 __iter__ 对象的生成器。
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
future = futures.Future(loop=loop)
h = future._loop.call_later(delay,
future._set_result_unless_cancelled, result)
try:
return (yield from future)
finally:
h.cancel()
class Future:
#blabla...
def __iter__(self):
if not self.done():
self._blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
当协程 yield from asyncio.sleep 时,事件循环其实是与 Future 对象建立了联系。程序运行结果如下:
async 和 await
在
Python3.5 中引入了 async 和 await ,可以将它们理解为 asyncio.coroutine / yield from
的完美替身, async/await 让协程表面上独立于生成器而存在,将细节隐藏于 asyncio 模块之下。使用 await
可以针对耗时的操作进行挂起,类似于生成器里的 yield 一样,使函数让出控制权。协程遇到 await
,事件循环挂起该协程,直到其他协程也挂起或者执行完毕,再进行下一个协程的执行。耗时的操作一般是一些 IO
操作,如网络请求,文件读取等。这里可以使用 asyncio.sleep 来进行模拟举例:
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
在 sleep 的时候,使用 await 让出控制权。当遇到阻塞调用的函数的时候,使用 await 方法将协程的控制权让出,以便 loop 调用其他的协程。
注意的区别是:
await 接受的对象必须是一个 awaitable 的对象,所谓 awaitable 的对象,就是一个实现了 __await()__
方法的对象,而且这个方法必须返回一个不是协程的迭代器。在 Python3.6 中, yield 和 await
也可以在同一个函数中使用,初次之外,也可以在列表推导等地方使用 async for 或 await 语法。
result = [i async for i in aiter() if i % 2]
result = [await func() for fun in funcs if await condition()]
async def test(x, y):
for i in range(y):
yield i
await asyncio.sleep(x)
协程与异步
与多线程编程不同的是,多个协程总是运行在同一个线程中,一旦其中的一个协程发生阻塞行为,进而所有的协程都无法继续运行。例如在我们进行爬虫编写时,习惯使用 requests 库,而这个库就是阻塞的。尝试使用协程的方式进行编写:
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447
而不使用协程,使用普通方式,也是这个时间。为什么会这样呢,究其原因是
requests 并不支持异步操作。在运行时阻塞并未挂起。另外 await 后面所跟的对象必须是:一个原生 coroutine 对象,一个由
types.coroutine 装饰的生成器,这个生成器可以返回 coroutine 对象。而 requests
返回的对象不符合上述条件。为了程序运行不报错,上面代码在 await 时对 requsts 进行了一次 async
函数的包装,但是它并不是“原生的coroutine对象”,因此也就不能真正异步了。
可以通过使用实现了异步的 aiohttp 或者 Trip 库改写上述爬虫。
import asyncio
import time
import aiohttp
from spider_normal import targets, show_results
final_results = {}
async def get_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
return len(content)
async def spider(url):
length = await get_content(url)
final_results[url] = length
return True
def main():
loop = asyncio.get_event_loop()
cor = [spider(url) for url in targets]
start_time = time.time()
result = loop.run_until_complete(asyncio.gather(*cor))
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
print("loop result: ", result)
if __name__ == '__main__':
main()
可以看到,运行时间大幅度降低。
总结
这篇文章记录自己对协程的一些简单理解和认知,某些细节较为简浅粗糙,协程目前在见到的项目中解除不多,更多的是使用多进程或多线程的方式去处理。通过这篇文章,可以看到,协程相对我们熟悉的线程,多进行还是有点难理解的,特别是协程内部处理以及异步操作原理,逻辑较为分散。希望以后的项目中能用到协程去处理问题。
网友评论