1.使用greenlet排序打印1234
from greenlet import greenlet
def func1():
print(1)
g2.switch()
print(3)
g2.switch()
def func2():
print(2)
g1.switch()
print(4)
g1=greenlet(func1)
g2=greenlet(func2)
g1.switch()
# -----------------------------------------------------------------------
# 2.gevent是一个自动切换耗时的协程库,而且现在一般都是用asyncio,gevent比greenlet高级点,因为可以自动切换耗时的io
# import gevent
# def task_1(num):
# for i in range(num):
# print(gevent.getcurrent(), i)
# gevent.sleep(i) # 模拟一个耗时操作,注意不能使用time模块的sleep
#
#
# if __name__ == "__main__":
# g1 = gevent.spawn(task_1, 5) # 创建协程
# g2 = gevent.spawn(task_1, 6)
# g3 = gevent.spawn(task_1, 7)
#
# g1.join() # 等待协程运行完毕
# g2.join()
# g3.join()
# ----------------------------------------------------------------
# 3.由于gevent是自动切换io,它修改了一些python标准库,所以需要给它打上补丁。
# from gevent import monkey
# import gevent
# import time
#
#
# def task_1(name):
# for i in range(5):
# print(name, i)
# time.sleep(1) # 协程遇到耗时操作后会自动切换其他协程运行
#
#
# def task_2(name):
# for i in range(3):
# print(name, i)
# time.sleep(1)
#
#
# if __name__ == "__main__":
# monkey.patch_all() # 给所有的耗时操作打上补丁
#
# gevent.joinall([ # 等到协程运行完毕
# gevent.spawn(task_1, "task_1"), # 创建协程
# gevent.spawn(task_2, "task_2")
# ])
# print("the main thread!")
# -------------------------------------------------------------------
# 4.补充一个知识点,关于partial函数
# 需要注意的是,传中间的参数给partial会出错,感觉没啥卵用
# from functools import partial
#
# def hello_world(a,b,c):
# print("a",a)
# print("b",b)
# print("c",c)
#
# hello_world(1,2,3)
# partial_hello_world=partial(hello_world,c=4)
# partial_hello_world(6,5)#这就是c这个参数被固定住了。
# ---------------------------------------
# 5.用yield实现写成,打印1234
def func1():
yield 1
yield from func2()
yield 4
def func2():
yield 2
yield 3
a=func1()
for i in a:
print(i)
# 生成器的send用法啊,运行一次生成器,并且给生成器赋一个值,注意第一个不能send,因为第一次生成器还没产生
#最后一个也不行,和next一样,会触发StopIteration
def generate():
i = 0
while i < 5:
print("我在这。。")
xx = yield i # 注意,python程序,碰到=,都是先从右往左执行,所以一开始xx变量是没有的
#因此不能开始用.send(值)
print(xx)
i += 1
g = generate()
print(g.send(None))#第一次只能赋值None
print(g.send("la"))
print(g.send("li"))
print(g.send("li"))
print(g.send("li"))
print(g.send("li"))
# ----------------------------------------
# 6.最新的协程编程库asyncio
import asyncio
async def dayin():#被asyncio装饰的是协程对象
print("in you")
result=dayin()#这里并没有执行,而是把协程对象重新赋值了
asyncio.run(result)#这里首先把协程对象加入事件循环,然后执行协程对象
# ------------------------------------------
# 7.await关键字,await后面可以等待,协程对象,task,future这三个对象
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "haha"
async def main():
print("this is the main func")
res=await others()
print(res)
asyncio.run(main())
# ---------------------------------------------------------
#8.task,是用于并发执行task协程对象的;一旦创建了task,就会把task添加到事件循环中
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print("end")
async def main():
print("main start")
task1=asyncio.create_task(others())
task2=asyncio.create_task(others())
await asyncio.sleep(1)
print("task创建完成")
res1=await task1
res2=await task2
print(res1)
print(res2)
print("main end")
asyncio.run(main())
# ---------------------------------------------------------------------
# 9.上面的写法一般不用,下面的写法才正宗
import asyncio
async def ts(n):
print("I am ts {}, start!".format(n))
await asyncio.sleep(2)
print("I am ts {}, end!".format(n))
return n
async def main():
print("main start")
task_list=[
asyncio.create_task(ts(1)),#python3.8以后才有name属性
asyncio.create_task(ts(2)),
]
done,pending=await asyncio.wait(task_list,timeout=None)#timeout是等待时间限制,
# 默认None,如果设置时间小于任务时间,就会强制这个任务结束,并且返回值在pending里面。
for i in done:
print(i.result())
print("main end")
asyncio.run(main())
# -----------------------------------------------------------------
# 10.改变下形式,也行。就是不再main里面创建task
import asyncio
async def ts(n):
print("I am ts {}, start!".format(n))
await asyncio.sleep(2)
print("I am ts {}, end!".format(n))
return n
task_list=[
ts(1),
ts(2),#这里为什么不创建任务呢?
#这里就涉及到task的内涵了,因为create_task会直接将创建的task添加到事件循环,而asyncio.wait任务列表的时候,事件循环还没创建,
#asyncio.run()函数就是先创建事件循环,然后执行事件循环里的任务。这个定义涉及到原来的两个函数,只是后来简化了,直接run了,所以写的时候
#要注意
]
done,pending=asyncio.run(asyncio.wait(task_list,timeout=None))
for i in done:
print(i.result())
# --------------------------------------------------------------------------------------------------------------------
# 11.一个更底层的future对象,也就是await可以等待的其中一个对象,他可以创建一个空的Future对象,等待被赋值,一旦赋值,等待结束
import asyncio
async def others(fur):
print("the start!")
await asyncio.sleep(5)
fur.set_result("furture is hopeness!")
print("the end!")
return 123
async def main():
loop=asyncio.get_running_loop()
fur=loop.create_future()
await loop.create_task(others(fur))
done=await fur
print(done)
asyncio.run(main())
# _______________________________________________________________
#12.线程池和进程池,为什么要讲这个呢?因为他们也有Future对象,而且在某些情况下可以转换
#multiprocessing库也有进程池,不过concurrent有线程池
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
pool = ThreadPoolExecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
print(type(fut))#<class 'concurrent.futures._base.Future'>
# -------------------------------------------------------------------------
# 13.下面讲的就是在某些情况下,项目不支持协程开发的时候的办法
from concurrent.futures import Future
import time
import asyncio
def fun1():
#某个耗时的操作
time.sleep(2)
return "sb"
async def main():
loop=asyncio.get_event_loop()
for i in range(10):
fut=loop.run_in_executor(None,fun1)#这里默认开启线程池,返回的结果是线程的Future对象,不支持等待的,然后他会包装成协程对象,然后就支持等待了
result=await fut
print(result)
asyncio.run(main())
# ----------------------------------------------
# 14.这个例子就是,不支持异步协程的情况,requests不支持异步的,测试后感觉很好用
import asyncio
import requests
async def download(url):
print("开始下载了:",url)
loop=asyncio.get_event_loop()#效果同get_running_loop(),只不过,当没有loop的时候,get_event_loop会创建一个新的loop
future=loop.run_in_executor(None,requests.get,url)
response=await future
print("下载完成")
file_name=url.rsplit(r"/")[-1]
with open(file_name,mode="wb") as file_object:
file_object.write(response.content)
if __name__=="__main__":
url_list=[
'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJ1bKyMmIcD4KABbCR_8uizsAALIGwOhI2UAFsJf372.jpg',
'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJlbKyMmIVhpJAAow7jt3ircAALIGwPXXxwACjEG766.jpg',
'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJlbKyMqINtb4AAcmGciRO9YAALIHAB850UAByYx992.jpg',
]
tasks=[download(url) for url in url_list]
loop = asyncio.get_event_loop()
asyncio.run(asyncio.wait(tasks))
# ------------------------------------------------------------
# 15.扩充知识点,关于异步迭代器
# 什么是迭代器?内部实现了__iter__和__next__的类
# 什么是异步迭代器?内部实现了__aiter__和__anext的类
import asyncio
class Reader:
def __init__(self):
self.count=0
async def readline(self):
self.count+=1
if self.count==100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val=await self.readline()
if val==None:
raise StopIteration
return val
async def main():
obj=Reader()
async for item in obj:
print(item)
asyncio.run(main())
# 16.异步上下文管理器
# 上下文管理器:类内实现了,__enter__和__exit__的类
# 异步的:实现了__aenter__和__aexit__
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn=None
async def do_something(self):
#异步操作数据库
return 666
async def __aenter__(self):
#异步连接数据库
self.conn=await asyncio.sleep(1)
return self
async def __aexit__(self):
#异步关闭数据库
await asyncio.sleep(1)
async def main():
async with AsyncContextManager() as f:
results=await f.do_something()
print(results)
asyncio.run(main())
# ------------------------------------------------------------
# 17.uvloop是asyncio的替代方案,性能高于asyncio
# import asyncio
# import uvloop
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 下面的写法同asyncio
#
# asgi?
# 答: 异步网关协议接口,一个介于网络协议服务和 Python 应用之间的标准接口,能够处理多种通用的协议类型,包括 HTTP,HTTP2 和 WebSocket。
# 比wsgi接口快
#
# 请简单介绍下 Uvicorn
答:Uvicorn 是基于 uvloop 和 httptools 构建的非常快速的 ASGI 服务器。目前,Python 仍缺乏异步的网关
协议接口,ASGI 的出现填补了这一空白,现在开始,我们能够使用共同的标准为所有的异步框架来实现一些
工具,ASGI 帮助 Python 在 Web 框架上和 Node.JS 及 Golang 相竟争,目标是获得高性能的 IO 密集型任
务,ASGI 支持 HTTP2 和 WebSockets,WSGI 是不支持的。
# -------------------------------------------------------------------
# 18.异步操作数据库
# aioredis库和aiomysql库
# --------------------------------------------------------------------------
# 19.fastapi异步网络接口框架,内部基于uvloop,速度快.比较新的框架,Django3也支持异步,非常好用
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app=FastAPI()
REDIS_POOL=aioredis.ConnectionsPool("redis://127.0.0.1:6379",password="123456",minsize=1,maxsize=10)
@app.get("/")
def index():
"""普通接口"""
return {"message":"Hello world"}
@app.get("/red")
async def red():
"""异步接口"""
print("请求来了")
await asyncio.sleep(3)
#start a connection
conn=await REDIS_POOL.acquire()
redis=Redis(conn)
#set value
await redis.hmset_dict("car",key1=1,key2=2,key3=3)
#read value
result=await redis.hgetall("car",encoding="utf-8")
#close it
REDIS_POOL.release(conn)
return result
if __name__=="__main__":
uvicorn.run(app,host="127.0.0.1",port=5000,log_level="info")
#-------------------------------------------------------------------------
#19.asyncio.gather可以按照顺序返回结果,而且支持多级分组
import asyncio
from pprint import pprint
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(1, 3))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])
all_groups = asyncio.gather(group1, group2, group3)
results = loop.run_until_complete(all_groups)
loop.close()
print(results)
#--------------------------------------------------------------------------
#20.asyncio.wait可以设置超时时间,设置超时时间后,如果没完成结果会在unfinished里面
#如果设置return_when=FIRST_COMPLETED,循环一次后完成的结果在finished里面,没完成的在unfinished。
#默认return_when是ALL_completed,也就是等待所有协橙都完成
import asyncio
import random
async def coro(tag):
print(">", tag)
await asyncio.sleep(random.uniform(0.5, 5))
print("<", tag)
return tag
loop = asyncio.get_event_loop()
tasks = [coro(i) for i in range(1, 11)]
print("Get first result:")
finished, unfinished = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
for task in finished:
print(task.result())
print("unfinished:", len(unfinished))
print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
asyncio.wait(unfinished, timeout=2))
for task in finished2:
print(task.result())
print("unfinished2:", len(unfinished2))
print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))
for task in finished3:
print(task.result())
loop.close()
#21.关于aiohttp的应用,这个博客园的大佬,源地址:https://www.cnblogs.com/wukai66/p/12632680.html
import aiohttp
import asyncio
import requests
from fake_useragent import UserAgent
from lxml import etree
ua=UserAgent()
headers={"User-Agent":ua.random}
url1='https://tieba.baidu.com/p/6398321305'
res=requests.get(url1,headers=headers)
html=etree.HTML(res.text)
results=html.xpath("//@href|//@src")
urlss=[]
for i in results:
if "http" in i and (
"jpg" in i or "jpeg" in i):
urlss.append(i)
print(urlss)
import aiohttp, asyncio
async def main(pool):#aiohttp必须放在异步函数中使用
tasks = []
sem = asyncio.Semaphore(pool)#限制同时请求的数量
[tasks.append(control_sem(sem, i)) for i in urlss]
await asyncio.wait(tasks)
async def control_sem(sem, url):#限制信号量
async with sem:
await fetch(url)
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
json = await resp.read()
import random
a=random.uniform(0,1000)
x=str(a)+".jpeg"
with open(x,'wb+') as f:
f.write(json)
asyncio.run(main(10))
#22.async with aiohttp.ClientSession() as session 共用session,这样不至于一直,每个请求新建一个session
import aiohttp
import asyncio
import requests
from fake_useragent import UserAgent
from lxml import etree
ua=UserAgent()
headers={"User-Agent":ua.random}
url1='https://tieba.baidu.com/p/6398321305'
res=requests.get(url1,headers=headers)
html=etree.HTML(res.text)
results=html.xpath("//@href|//@src")
urlss=[]
for i in results:
if "http" in i and (
"jpg" in i or "jpeg" in i):
urlss.append(i)
print(urlss)
import aiohttp, asyncio
async def main(pool):#aiohttp必须放在异步函数中使用
tasks = []
sem = asyncio.Semaphore(pool)#限制同时请求的数量
[tasks.append(control_sem(sem, i)) for i in urlss]
await asyncio.wait(tasks)
async def control_sem(sem, url):#限制信号量
async with sem:
async with aiohttp.ClientSession() as session:
await fetch(url,session)
async def fetch(url,session):
async with session.get(url) as resp:
json = await resp.read()
import random
a=random.uniform(0,1000)
x=str(a)+".jpeg"
with open(x,'wb+') as f:
f.write(json)
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(10))
网友评论