美文网首页
Python协程和异步编程

Python协程和异步编程

作者: lk_erzanml | 来源:发表于2021-01-19 08:34 被阅读0次
1.使用greenlet排序打印1234
from greenlet import greenlet

def func1():
    print(1)
    g2.switch()
    print(3)
    g2.switch()

def func2():
    print(2)
    g1.switch()
    print(4)


g1=greenlet(func1)
g2=greenlet(func2)

g1.switch()
# -----------------------------------------------------------------------
# 2.gevent是一个自动切换耗时的协程库,而且现在一般都是用asyncio,gevent比greenlet高级点,因为可以自动切换耗时的io
# import gevent

# def task_1(num):
#     for i in range(num):
#         print(gevent.getcurrent(), i)
#         gevent.sleep(i)  # 模拟一个耗时操作,注意不能使用time模块的sleep
#
#
# if __name__ == "__main__":
#     g1 = gevent.spawn(task_1, 5)  # 创建协程
#     g2 = gevent.spawn(task_1, 6)
#     g3 = gevent.spawn(task_1, 7)
#
#     g1.join()  # 等待协程运行完毕
#     g2.join()
#     g3.join()
# ----------------------------------------------------------------
# 3.由于gevent是自动切换io,它修改了一些python标准库,所以需要给它打上补丁。
# from gevent import monkey
# import gevent
# import time
#
#
# def task_1(name):
#     for i in range(5):
#         print(name, i)
#         time.sleep(1)  # 协程遇到耗时操作后会自动切换其他协程运行
#
#
# def task_2(name):
#     for i in range(3):
#         print(name, i)
#         time.sleep(1)
#
#
# if __name__ == "__main__":
#     monkey.patch_all()  # 给所有的耗时操作打上补丁
#
#     gevent.joinall([  # 等到协程运行完毕
#         gevent.spawn(task_1, "task_1"),  # 创建协程
#         gevent.spawn(task_2, "task_2")
#     ])
#     print("the main thread!")
# -------------------------------------------------------------------
# 4.补充一个知识点,关于partial函数
# 需要注意的是,传中间的参数给partial会出错,感觉没啥卵用
# from functools import partial
#
# def hello_world(a,b,c):
#     print("a",a)
#     print("b",b)
#     print("c",c)
#
# hello_world(1,2,3)
# partial_hello_world=partial(hello_world,c=4)
# partial_hello_world(6,5)#这就是c这个参数被固定住了。
# ---------------------------------------
# 5.用yield实现写成,打印1234
def func1():
    yield 1
    yield from func2()
    yield 4


def func2():
    yield 2
    yield 3

a=func1()
for i in a:
    print(i)
# 生成器的send用法啊,运行一次生成器,并且给生成器赋一个值,注意第一个不能send,因为第一次生成器还没产生
#最后一个也不行,和next一样,会触发StopIteration
def generate():
    i = 0
    while i < 5:
        print("我在这。。")
        xx = yield i  # 注意,python程序,碰到=,都是先从右往左执行,所以一开始xx变量是没有的
        #因此不能开始用.send(值)
        print(xx)
        i += 1


g = generate()

print(g.send(None))#第一次只能赋值None
print(g.send("la"))
print(g.send("li"))
print(g.send("li"))
print(g.send("li"))
print(g.send("li"))
# ----------------------------------------
# 6.最新的协程编程库asyncio
 import asyncio

 async def dayin():#被asyncio装饰的是协程对象
     print("in you")

 result=dayin()#这里并没有执行,而是把协程对象重新赋值了

 asyncio.run(result)#这里首先把协程对象加入事件循环,然后执行协程对象
# ------------------------------------------
# 7.await关键字,await后面可以等待,协程对象,task,future这三个对象
import asyncio
async def others():
     print("start")
     await asyncio.sleep(2)
     print("end")
     return "haha"

 async def main():
     print("this is the main func")
     res=await others()
     print(res)
 asyncio.run(main())
# ---------------------------------------------------------
#8.task,是用于并发执行task协程对象的;一旦创建了task,就会把task添加到事件循环中
import asyncio
async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")

async def main():
    print("main start")
    task1=asyncio.create_task(others())
    task2=asyncio.create_task(others())
    await asyncio.sleep(1)
    print("task创建完成")
    res1=await task1
    res2=await task2
    print(res1)
    print(res2)
    print("main end")

asyncio.run(main())
# ---------------------------------------------------------------------
# 9.上面的写法一般不用,下面的写法才正宗
import asyncio

async def ts(n):
    print("I am ts {}, start!".format(n))
    await asyncio.sleep(2)
    print("I am ts {}, end!".format(n))
    return n

async def main():
    print("main start")
    task_list=[
        asyncio.create_task(ts(1)),#python3.8以后才有name属性
        asyncio.create_task(ts(2)),
    ]
    done,pending=await asyncio.wait(task_list,timeout=None)#timeout是等待时间限制,
    # 默认None,如果设置时间小于任务时间,就会强制这个任务结束,并且返回值在pending里面。
    for i in done:
        print(i.result())
    print("main end")
asyncio.run(main())
# -----------------------------------------------------------------
# 10.改变下形式,也行。就是不再main里面创建task
import asyncio

async def ts(n):
    print("I am ts {}, start!".format(n))
    await asyncio.sleep(2)
    print("I am ts {}, end!".format(n))
    return n

task_list=[
    ts(1),
    ts(2),#这里为什么不创建任务呢?
    #这里就涉及到task的内涵了,因为create_task会直接将创建的task添加到事件循环,而asyncio.wait任务列表的时候,事件循环还没创建,
    #asyncio.run()函数就是先创建事件循环,然后执行事件循环里的任务。这个定义涉及到原来的两个函数,只是后来简化了,直接run了,所以写的时候
    #要注意
]

done,pending=asyncio.run(asyncio.wait(task_list,timeout=None))
for i in done:
    print(i.result())
# --------------------------------------------------------------------------------------------------------------------
# 11.一个更底层的future对象,也就是await可以等待的其中一个对象,他可以创建一个空的Future对象,等待被赋值,一旦赋值,等待结束
import asyncio

async def others(fur):
    print("the start!")
    await asyncio.sleep(5)
    fur.set_result("furture is hopeness!")
    print("the end!")
    return 123

async def main():
    loop=asyncio.get_running_loop()

    fur=loop.create_future()

    await loop.create_task(others(fur))

    done=await fur
    print(done)

asyncio.run(main())
# _______________________________________________________________
#12.线程池和进程池,为什么要讲这个呢?因为他们也有Future对象,而且在某些情况下可以转换
#multiprocessing库也有进程池,不过concurrent有线程池
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
    time.sleep(1)
    print(value)
pool = ThreadPoolExecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    fut = pool.submit(func, i)
    print(fut)
    print(type(fut))#<class 'concurrent.futures._base.Future'>
# -------------------------------------------------------------------------
# 13.下面讲的就是在某些情况下,项目不支持协程开发的时候的办法
from concurrent.futures import Future
import time
import asyncio
def fun1():
    #某个耗时的操作
    time.sleep(2)
    return "sb"

async def main():
    loop=asyncio.get_event_loop()
    for i in range(10):
        fut=loop.run_in_executor(None,fun1)#这里默认开启线程池,返回的结果是线程的Future对象,不支持等待的,然后他会包装成协程对象,然后就支持等待了
        result=await fut
        print(result)

asyncio.run(main())
# ----------------------------------------------
# 14.这个例子就是,不支持异步协程的情况,requests不支持异步的,测试后感觉很好用
import asyncio
import requests
async def download(url):
    print("开始下载了:",url)
    loop=asyncio.get_event_loop()#效果同get_running_loop(),只不过,当没有loop的时候,get_event_loop会创建一个新的loop
    future=loop.run_in_executor(None,requests.get,url)
    response=await future
    print("下载完成")
    file_name=url.rsplit(r"/")[-1]
    with open(file_name,mode="wb") as file_object:
        file_object.write(response.content)

if __name__=="__main__":
    url_list=[
    'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJ1bKyMmIcD4KABbCR_8uizsAALIGwOhI2UAFsJf372.jpg',
        'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJlbKyMmIVhpJAAow7jt3ircAALIGwPXXxwACjEG766.jpg',
        'https://desk-fd.zol-img.com.cn/t_s960x600c5/g5/M00/02/05/ChMkJlbKyMqINtb4AAcmGciRO9YAALIHAB850UAByYx992.jpg',
    ]
    tasks=[download(url) for url in url_list]
    loop = asyncio.get_event_loop()
    asyncio.run(asyncio.wait(tasks))
# ------------------------------------------------------------
# 15.扩充知识点,关于异步迭代器
# 什么是迭代器?内部实现了__iter__和__next__的类
# 什么是异步迭代器?内部实现了__aiter__和__anext的类
import asyncio
class Reader:
    def __init__(self):
        self.count=0
    async def readline(self):
        self.count+=1
        if self.count==100:
            return None
        return self.count
    def __aiter__(self):
        return self
    async def __anext__(self):
        val=await self.readline()
        if val==None:
            raise StopIteration
        return val

async def main():
    obj=Reader()
    async for item in obj:
        print(item)

asyncio.run(main())
# 16.异步上下文管理器
# 上下文管理器:类内实现了,__enter__和__exit__的类
# 异步的:实现了__aenter__和__aexit__
import asyncio
class AsyncContextManager:
    def __init__(self):
        self.conn=None
    async def do_something(self):
        #异步操作数据库
        return 666
    async def __aenter__(self):
        #异步连接数据库
        self.conn=await asyncio.sleep(1)
        return self
    async def __aexit__(self):
        #异步关闭数据库
        await asyncio.sleep(1)

async def main():
    async with AsyncContextManager() as f:
        results=await f.do_something()
        print(results)

asyncio.run(main())
# ------------------------------------------------------------
# 17.uvloop是asyncio的替代方案,性能高于asyncio
# import asyncio
# import uvloop
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 下面的写法同asyncio
#
# asgi?
# 答: 异步网关协议接口,一个介于网络协议服务和 Python 应用之间的标准接口,能够处理多种通用的协议类型,包括 HTTP,HTTP2 和 WebSocket。
# 比wsgi接口快
#
# 请简单介绍下 Uvicorn
答:Uvicorn 是基于 uvloop 和 httptools 构建的非常快速的 ASGI 服务器。目前,Python 仍缺乏异步的网关
协议接口,ASGI 的出现填补了这一空白,现在开始,我们能够使用共同的标准为所有的异步框架来实现一些
工具,ASGI 帮助 Python 在 Web 框架上和 Node.JS 及 Golang 相竟争,目标是获得高性能的 IO 密集型任
务,ASGI 支持 HTTP2 和 WebSockets,WSGI 是不支持的。
# -------------------------------------------------------------------
# 18.异步操作数据库
# aioredis库和aiomysql库
# --------------------------------------------------------------------------
# 19.fastapi异步网络接口框架,内部基于uvloop,速度快.比较新的框架,Django3也支持异步,非常好用
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI

app=FastAPI()

REDIS_POOL=aioredis.ConnectionsPool("redis://127.0.0.1:6379",password="123456",minsize=1,maxsize=10)

@app.get("/")
def index():
    """普通接口"""
    return {"message":"Hello world"}

@app.get("/red")
async def red():
    """异步接口"""
    print("请求来了")
    await  asyncio.sleep(3)
    #start a connection
    conn=await REDIS_POOL.acquire()
    redis=Redis(conn)

    #set value
    await redis.hmset_dict("car",key1=1,key2=2,key3=3)

    #read value
    result=await redis.hgetall("car",encoding="utf-8")

    #close it
    REDIS_POOL.release(conn)
    return result

if __name__=="__main__":
    uvicorn.run(app,host="127.0.0.1",port=5000,log_level="info")
#-------------------------------------------------------------------------
#19.asyncio.gather可以按照顺序返回结果,而且支持多级分组
import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

print(results)
#--------------------------------------------------------------------------
#20.asyncio.wait可以设置超时时间,设置超时时间后,如果没完成结果会在unfinished里面
#如果设置return_when=FIRST_COMPLETED,循环一次后完成的结果在finished里面,没完成的在unfinished。
#默认return_when是ALL_completed,也就是等待所有协橙都完成
import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()
#21.关于aiohttp的应用,这个博客园的大佬,源地址:https://www.cnblogs.com/wukai66/p/12632680.html
import aiohttp
import asyncio
import requests
from fake_useragent import UserAgent
from lxml import etree

ua=UserAgent()
headers={"User-Agent":ua.random}
url1='https://tieba.baidu.com/p/6398321305'
res=requests.get(url1,headers=headers)

html=etree.HTML(res.text)
results=html.xpath("//@href|//@src")
urlss=[]
for i in results:
    if "http" in i and (
        "jpg" in i or "jpeg" in i):
        urlss.append(i)

print(urlss)


import aiohttp, asyncio

async def main(pool):#aiohttp必须放在异步函数中使用
    tasks = []
    sem = asyncio.Semaphore(pool)#限制同时请求的数量
    [tasks.append(control_sem(sem, i)) for i in urlss]
    await asyncio.wait(tasks)

async def control_sem(sem, url):#限制信号量
    async with sem:
        await fetch(url)

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            json = await resp.read()
            import random
            a=random.uniform(0,1000)
            x=str(a)+".jpeg"
            with open(x,'wb+') as f:
                f.write(json)


asyncio.run(main(10))
#22.async with aiohttp.ClientSession() as session 共用session,这样不至于一直,每个请求新建一个session
import aiohttp
import asyncio
import requests
from fake_useragent import UserAgent
from lxml import etree

ua=UserAgent()
headers={"User-Agent":ua.random}
url1='https://tieba.baidu.com/p/6398321305'
res=requests.get(url1,headers=headers)

html=etree.HTML(res.text)
results=html.xpath("//@href|//@src")
urlss=[]
for i in results:
    if "http" in i and (
        "jpg" in i or "jpeg" in i):
        urlss.append(i)

print(urlss)


import aiohttp, asyncio

async def main(pool):#aiohttp必须放在异步函数中使用
    tasks = []
    sem = asyncio.Semaphore(pool)#限制同时请求的数量
    [tasks.append(control_sem(sem, i)) for i in urlss]
    await asyncio.wait(tasks)

async def control_sem(sem, url):#限制信号量
    async with sem:
        async with aiohttp.ClientSession() as session:
            await fetch(url,session)

async def fetch(url,session):
        async with session.get(url) as resp:
            json = await resp.read()
            import random
            a=random.uniform(0,1000)
            x=str(a)+".jpeg"
            with open(x,'wb+') as f:
                f.write(json)

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(10))

相关文章

  • Gevent高并发网络库精解

    进程 线程 协程 异步 并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。 多进程编程在python...

  • 2018-03-12

    python异步与协程 异步编程: 异步I/O selet/poll/epoll 事件循环 + 回调共享状态管理困...

  • python异步协程(aiohttp,asyncio)

    python异步协程 环境:python3.7.0 协程 协程,英文叫做 Coroutine,又称微线程,纤程,协...

  • Tornado #2 异步并发(协程)实现

    Tornado本身提供了异步并发(协程)实现。 使用Python异步编程时,我们有多种选择。比如callback、...

  • Python协程和异步编程

  • Gevent

    前述 进程 线程 协程 异步 并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。 多进程编程在pyt...

  • Gevent简明教程

    前述 进程 线程 协程 异步 并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。 多进程编程在pyt...

  • Python 异步:完整教程

    Asyncio 允许我们在 Python 中使用基于协程的并发异步编程。尽管 asyncio 已经在 Python...

  • 并发编程-协程

    协程greenlet模块 (gevent实现原理)gevent模块 (注册协程,实现异步编程) 协程的应用eg:...

  • python 协程和异步I/O的实践

    python 协程和异步I/O的实践 协程的概念 协程(coroutine)通常又称之为微线程或纤程,它是相互协作...

网友评论

      本文标题:Python协程和异步编程

      本文链接:https://www.haomeiwen.com/subject/oanaaktx.html